# IGNITE-709 Fix tests: sometimes client didn't receive discovery data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fd94eab2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fd94eab2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fd94eab2 Branch: refs/heads/ignite-709_3 Commit: fd94eab2c78bc475a27187b0c54bd239bcb6b731 Parents: 9986301 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Fri May 15 19:05:09 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Fri May 15 19:05:09 2015 +0300 ---------------------------------------------------------------------- .../discovery/tcp/TcpClientDiscoverySpi.java | 20 ++++----- .../spi/discovery/tcp/TcpDiscoverySpi.java | 11 ++++- .../TcpDiscoveryNodeAddFinishedMessage.java | 43 ++++++++++++++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 38 ++++++++++++++--- 4 files changed, 96 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd94eab2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index ad9f947..e9ddbfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1223,16 +1223,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (msg.topologyHistory() != null) topHist.putAll(msg.topologyHistory()); - - Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData(); - - if (dataMap != null) { - for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) - onExchange(newNodeId, entry.getKey(), entry.getValue(), null); - } - - locNode.setAttributes(node.attributes()); - locNode.visible(true); } else if (log.isDebugEnabled()) log.debug("Discarding node added message with empty topology: " + msg); @@ -1265,6 +1255,16 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (getLocalNodeId().equals(msg.nodeId())) { if (joinLatch.getCount() > 0) { + Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData(); + + if (dataMap != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) + onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null); + } + + locNode.setAttributes(msg.clientNodeAttributes()); + locNode.visible(true); + long topVer = msg.topologyVersion(); locNode.order(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd94eab2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index fb64764..170670f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -3574,7 +3574,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (msg.verified()) { stats.onRingMessageReceived(msg); - processNodeAddFinishedMessage(new TcpDiscoveryNodeAddFinishedMessage(locNodeId, node.id())); + TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId, + node.id()); + + if (node.isClient()) { + addFinishMsg.clientDiscoData(msg.oldNodesDiscoveryData()); + + addFinishMsg.clientNodeAttributes(node.attributes()); + } + + processNodeAddFinishedMessage(addFinishMsg); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd94eab2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 5a71eb3..1d974e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -17,7 +17,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -34,6 +36,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess private final UUID nodeId; /** + * Client node can not get discovery data from TcpDiscoveryNodeAddedMessage, we have to pass discovery data in + * TcpDiscoveryNodeAddFinishedMessage + */ + @GridToStringExclude + private Map<UUID, Map<Integer, byte[]>> clientDiscoData; + + /** */ + @GridToStringExclude + private Map<String, Object> clientNodeAttrs; + + /** * Constructor. * * @param creatorNodeId ID of the creator node (coordinator). @@ -54,6 +67,36 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess return nodeId; } + /** + * @return Discovery data for joined client. + */ + public Map<UUID, Map<Integer, byte[]>> clientDiscoData() { + return clientDiscoData; + } + + /** + * @param clientDiscoData Discovery data for joined client. + */ + public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) { + this.clientDiscoData = clientDiscoData; + + assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId); + } + + /** + * @return Client node attributes. + */ + public Map<String, Object> clientNodeAttributes() { + return clientNodeAttrs; + } + + /** + * @param clientNodeAttrs New client node attributes. + */ + public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) { + this.clientNodeAttrs = clientNodeAttrs; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd94eab2/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 87b63c4..3afa9bc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; @@ -714,11 +715,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { IgniteMessaging msg = grid(masterName).message(); - UUID id = null; + UUID id = msg.remoteListen(null, new MessageListener()); try { - id = msg.remoteListen(null, new MessageListener()); - msgLatch = new CountDownLatch(2); msg.send(null, "Message 1"); @@ -737,12 +736,41 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { await(msgLatch); } finally { - if (id != null) - msg.stopRemoteListen(id); + msg.stopRemoteListen(id); } } /** + * @throws Exception If failed. + */ + public void testDataExchangeFromServer2() throws Exception { + startServerNodes(2); + + IgniteMessaging msg = grid("server-1").message(); + + UUID id = msg.remoteListen(null, new MessageListener()); + + try { + startClientNodes(1); + + assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0") + .cluster().localNode()).clientRouterNodeId()); + + checkNodes(2, 1); + + msgLatch = new CountDownLatch(3); + + msg.send(null, "Message"); + + await(msgLatch); + } + finally { + msg.stopRemoteListen(id); + } + } + + + /** * @throws Exception If any error occurs. */ public void testDuplicateId() throws Exception {