Repository: incubator-ignite Updated Branches: refs/heads/ignite-80 1de5fe5aa -> 783ea8b0b
#IGNITE-80 Debug Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/783ea8b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/783ea8b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/783ea8b0 Branch: refs/heads/ignite-80 Commit: 783ea8b0b704d419d0a7ad491dd0f14d23287fa0 Parents: 1de5fe5 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Apr 16 18:00:05 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Apr 16 18:00:05 2015 -0700 ---------------------------------------------------------------------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 53 +++++++++++--------- .../TcpDiscoveryNodeAddFinishedMessage.java | 40 ++++++++++++++- 2 files changed, 69 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783ea8b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 1136d06..267f707 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -3492,7 +3492,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (msg.verified()) { stats.onRingMessageReceived(msg); - processNodeAddFinishedMessage(new TcpDiscoveryNodeAddFinishedMessage(locNodeId, node.id())); + processNodeAddFinishedMessage(new TcpDiscoveryNodeAddFinishedMessage( + locNodeId, + node.id(), + msg.newNodeDiscoveryData(), + msg.oldNodesDiscoveryData())); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); @@ -3590,11 +3594,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (topChanged) { assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']'; - Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); - - if (data != null) - onExchange(node.id(), node.id(), node.order(), data, U.gridClassLoader()); - msg.addDiscoveryData(locNodeId, collectExchangeData(node.id())); } @@ -3604,9 +3603,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } if (msg.verified() && locNodeId.equals(node.id())) { - // Discovery data. - Map<UUID, Map<Integer, byte[]>> dataMap; - synchronized (mux) { if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) { // Initialize topology. @@ -3630,8 +3626,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Restored topology from node added message: " + ring); - dataMap = msg.oldNodesDiscoveryData(); - topHist.clear(); topHist.putAll(msg.topologyHistory()); @@ -3642,7 +3636,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov msg.messages(null, null); msg.topology(null); msg.topologyHistory(null); - msg.clearDiscoveryData(); + // TODO IGNITE-80 how to clear discovery data? msg.clearDiscoveryData(); } else { if (log.isDebugEnabled()) @@ -3661,17 +3655,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } } - - // Notify outside of synchronized block. - if (dataMap != null) { - for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) { - onExchange(node.id(), - entry.getKey(), - node.order(), - entry.getValue(), - U.gridClassLoader()); - } - } } if (ring.hasRemoteNodes()) @@ -3744,6 +3727,30 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov fireEvt = true; } + + if (!locNodeId.equals(nodeId)) { + msg.addDiscoveryData(locNodeId, collectExchangeData(msg.nodeId())); + + Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); + + if (data != null) + onExchange(node.id(), node.id(), node.order(), data, U.gridClassLoader()); + } + else { + // Discovery data. + Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData(); + + // Notify outside of synchronized block. + if (dataMap != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) { + onExchange(node.id(), + entry.getKey(), + node.order(), + entry.getValue(), + U.gridClassLoader()); + } + } + } } if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/783ea8b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 5a71eb3..95b013a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -33,16 +33,29 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess /** Added node ID. */ private final UUID nodeId; + /** Discovery data from new node. */ + private Map<Integer, byte[]> newNodeDiscoData; + + /** Discovery data from old nodes. */ + private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData; + /** * Constructor. * * @param creatorNodeId ID of the creator node (coordinator). * @param nodeId Added node ID. */ - public TcpDiscoveryNodeAddFinishedMessage(UUID creatorNodeId, UUID nodeId) { + public TcpDiscoveryNodeAddFinishedMessage( + UUID creatorNodeId, + UUID nodeId, + Map<Integer, byte[]> newNodeDiscoData, + Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData + ) { super(creatorNodeId); this.nodeId = nodeId; + this.newNodeDiscoData = newNodeDiscoData; + this.oldNodesDiscoData = oldNodesDiscoData; } /** @@ -54,6 +67,31 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess return nodeId; } + /** + * @return Discovery data from new node. + */ + public Map<Integer, byte[]> newNodeDiscoveryData() { + return newNodeDiscoData; + } + + /** + * @return Discovery data from old nodes. + */ + public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() { + return oldNodesDiscoData; + } + + /** + * @param nodeId Node ID. + * @param discoData Discovery data to add. + */ + public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) { + // Old nodes disco data may be null if message + // makes more than 1 pass due to stopping of the nodes in topology. + if (oldNodesDiscoData != null) + oldNodesDiscoData.put(nodeId, discoData); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());