# ignite-883 client reconnect issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/54bfa36c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/54bfa36c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/54bfa36c Branch: refs/heads/ignite-gg-10052 Commit: 54bfa36c7417109832effe9c59c0120d9249b1b9 Parents: f4b1123 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jun 16 12:14:20 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jun 16 14:03:03 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 87 ++++++++++++++------ .../distributed/IgniteCacheManyClientsTest.java | 66 +++++++++++---- 2 files changed, 116 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/54bfa36c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a17296c..fef6f4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -545,14 +545,31 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Discovery message. * @return Latest topology snapshot. */ - private NavigableSet<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) { + private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) { this.topVer = topVer; + if (!topHist.isEmpty() && topVer <= topHist.lastKey()) { + if (log.isDebugEnabled()) + log.debug("Skip topology update since topology already updated [msg=" + msg + + ", lastHistKey=" + topHist.lastKey() + + ", topVer=" + topVer + + ", locNode=" + locNode + ']'); + + Collection<ClusterNode> top = topHist.get(topVer); + + assert top != null : msg; + + return top; + } + NavigableSet<ClusterNode> allNodes = allVisibleNodes(); if (!topHist.containsKey(topVer)) { assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + topHist.lastKey() + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg; + "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) + + ", newVer=" + topVer + + ", locNode=" + locNode + + ", msg=" + msg; topHist.put(topVer, allNodes); @@ -886,7 +903,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param join {@code True} if reconnects during join. */ protected Reconnector(boolean join) { - super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); + super(spi.ignite().name(), "tcp-client-disco-reconnector", log); this.join = join; } @@ -944,7 +961,8 @@ class ClientImpl extends TcpDiscoveryImpl { sock.setKeepAlive(true); sock.setTcpNoDelay(true); - // Wait for + List<TcpDiscoveryAbstractMessage> msgs = null; + while (!isInterrupted()) { TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); @@ -955,12 +973,23 @@ class ClientImpl extends TcpDiscoveryImpl { if (res.success()) { msgWorker.addMessage(res); + if (msgs != null) { + for (TcpDiscoveryAbstractMessage msg0 : msgs) + msgWorker.addMessage(msg0); + } + success = true; } return; } } + else if (spi.ensured(msg)) { + if (msgs == null) + msgs = new ArrayList<>(); + + msgs.add(msg); + } } } catch (IOException | IgniteCheckedException e) { @@ -1286,23 +1315,32 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - if (!topHist.isEmpty() && msg.topologyVersion() <= topHist.lastKey()) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message since topology already updated " + - "[msg=" + msg + ", lastHistKey=" + topHist.lastKey() + ", node=" + node + ']'); - - return; - } + boolean evt = false; long topVer = msg.topologyVersion(); - node.order(topVer); - node.visible(true); + assert topVer > 0 : msg; + + if (!node.visible()) { + node.order(topVer); + node.visible(true); + + if (spi.locNodeVer.equals(node.version())) + node.version(spi.locNodeVer); + + evt = true; + } + else { + if (log.isDebugEnabled()) + log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']'); + + assert node.order() == topVer : node; + } - if (spi.locNodeVer.equals(node.version())) - node.version(spi.locNodeVer); + Collection<ClusterNode> top = updateTopologyHistory(topVer, msg); - NavigableSet<ClusterNode> top = updateTopologyHistory(topVer, msg); + assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + + ", node=" + node + ", top=" + top + ']'; if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1311,9 +1349,11 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); + if (evt) { + notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); - spi.stats.onNodeJoined(); + spi.stats.onNodeJoined(); + } } } @@ -1340,7 +1380,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); + Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1383,7 +1423,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); + Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1555,7 +1595,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param node Node. * @param top Topology snapshot. */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) { + private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) { notifyDiscovery(type, topVer, node, top, null); } @@ -1564,8 +1604,9 @@ class ClientImpl extends TcpDiscoveryImpl { * @param topVer Topology version. * @param node Node. * @param top Topology snapshot. + * @param data Optional custom message data. */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top, + private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top, @Nullable DiscoverySpiCustomMessage data) { DiscoverySpiListener lsnr = spi.lsnr; @@ -1589,7 +1630,7 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * + * @return Queue size. */ public int queueSize() { return queue.size(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/54bfa36c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 62c7c1a..947ded2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed; +import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -102,16 +104,14 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testManyClients() throws Exception { + public void testManyClients() throws Throwable { manyClientsPutGet(); } /** * @throws Exception If failed. */ - public void testManyClientsClientDiscovery() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-883"); - + public void testManyClientsClientDiscovery() throws Throwable { clientDiscovery = true; manyClientsPutGet(); @@ -121,8 +121,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testManyClientsSequentiallyClientDiscovery() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-883"); - clientDiscovery = true; manyClientsSequentially(); @@ -162,33 +160,48 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { log.info("All clients started."); - assertEquals(SRVS + CLIENTS, G.allGrids().size()); + checkNodes(SRVS + CLIENTS); + + for (Ignite client : clients) + client.close(); + } + + /** + * @param expCnt Expected number of nodes. + */ + private void checkNodes(int expCnt) { + assertEquals(expCnt, G.allGrids().size()); long topVer = -1L; for (Ignite ignite : G.allGrids()) { - assertEquals(SRVS + CLIENTS, ignite.cluster().nodes().size()); + log.info("Check node: " + ignite.name()); if (topVer == -1L) topVer = ignite.cluster().topologyVersion(); else - assertEquals(topVer, ignite.cluster().topologyVersion()); - } + assertEquals("Unexpected topology version for node: " + ignite.name(), + topVer, + ignite.cluster().topologyVersion()); - for (Ignite client : clients) - client.close(); + assertEquals("Unexpected number of nodes for node: " + ignite.name(), + expCnt, + ignite.cluster().nodes().size()); + } } /** * @throws Exception If failed. */ - private void manyClientsPutGet() throws Exception { + private void manyClientsPutGet() throws Throwable { client = true; final AtomicInteger idx = new AtomicInteger(SRVS); final AtomicBoolean stop = new AtomicBoolean(); + final AtomicReference<Throwable> err = new AtomicReference<>(); + final int THREADS = 50; final CountDownLatch latch = new CountDownLatch(THREADS); @@ -224,7 +237,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { counted = true; - while (!stop.get()) { + while (!stop.get() && err.get() == null) { key = rnd.nextInt(0, 1000); cache.put(key, iter++); @@ -240,6 +253,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { return null; } catch (Throwable e) { + err.compareAndSet(null, e); + log.error("Unexpected error in client thread: " + e, e); throw e; @@ -257,6 +272,29 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { Thread.sleep(10_000); + Throwable err0 = err.get(); + + if (err0 != null) + throw err0; + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkNodes(SRVS + THREADS); + + return true; + } + catch (AssertionFailedError e) { + log.info("Check failed, will retry: " + e); + } + + return false; + } + }, 10_000); + + if (!wait) + checkNodes(SRVS + THREADS); + log.info("Stop clients."); stop.set(true);