# IGNITE-709 Fail remote nodes on TcpClientDiscoverySpi.dissconnect()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcff2103 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcff2103 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcff2103 Branch: refs/heads/ignite-836_2 Commit: bcff2103026318e50f6a4e3ac5d018bd26073e2e Parents: 71f9172 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu May 14 16:20:16 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu May 14 16:20:16 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheUtils.java | 14 +- .../GridDhtPartitionsExchangeFuture.java | 7 - .../discovery/tcp/TcpClientDiscoverySpi.java | 139 +++++++++++-------- .../tcp/internal/TcpDiscoveryNodesRing.java | 2 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 4 + 5 files changed, 95 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 549f42f..8e082c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -631,9 +631,14 @@ public class GridCacheUtils { * @return Oldest node for the given topology version. */ public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) { + Collection<ClusterNode> aliveCacheNodes = aliveNodes(cctx, topOrder); + + if (aliveCacheNodes.isEmpty()) + return cctx.localNode(); + ClusterNode oldest = null; - for (ClusterNode n : aliveNodes(cctx, topOrder)) + for (ClusterNode n : aliveCacheNodes) if (oldest == null || n.order() < oldest.order()) oldest = n; @@ -651,9 +656,14 @@ public class GridCacheUtils { * @return Oldest node for the given topology version. */ public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) { + Collection<ClusterNode> aliveCacheNodes = aliveCacheNodes(cctx, topOrder); + + if (aliveCacheNodes.isEmpty()) + return cctx.localNode(); + ClusterNode oldest = null; - for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) { + for (ClusterNode n : aliveCacheNodes) { if (oldest == null || n.order() < oldest.order()) oldest = n; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4b8db00..92dad4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -398,13 +398,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Oldest node. - */ - ClusterNode oldestNode() { - return oldestNode.get(); - } - - /** * @return Exchange ID. */ public GridDhtPartitionExchangeId exchangeId() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 8cdd7d1..8b32dd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -73,6 +73,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + /** Topology history. */ + private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + /** Remote nodes. */ private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>(); @@ -345,11 +348,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** {@inheritDoc} */ @Override public Collection<ClusterNode> getRemoteNodes() { - return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() { - @Override public boolean apply(TcpDiscoveryNode node) { - return node.visible(); - } - })); + return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES); } /** {@inheritDoc} */ @@ -419,6 +418,29 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp U.join(msgWorker, log); U.join(sockWriter, log); U.join(sockReader, log); + + leaveLatch.countDown(); + joinLatch.countDown(); + + getSpiContext().deregisterPorts(); + + Collection<ClusterNode> rmts = getRemoteNodes(); + + // This is restart/disconnection and remote nodes are not empty. + // We need to fire FAIL event for each. + DiscoverySpiListener lsnr = this.lsnr; + + if (lsnr != null) { + for (ClusterNode n : rmts) { + rmtNodes.remove(n.id()); + + Collection<ClusterNode> top = updateTopologyHistory(topVer + 1); + + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null); + } + } + + rmtNodes.clear(); } /** {@inheritDoc} */ @@ -555,6 +577,47 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * @param topVer New topology version. + * @return Latest topology snapshot. + */ + private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) { + this.topVer = topVer; + + NavigableSet<ClusterNode> allNodes = allVisibleNodes(); + + if (!topHist.containsKey(topVer)) { + assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : + "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; + + topHist.put(topVer, allNodes); + + if (topHist.size() > topHistSize) + topHist.pollFirstEntry(); + + assert topHist.lastKey() == topVer; + assert topHist.size() <= topHistSize; + } + + return allNodes; + } + + /** + * @return All nodes. + */ + private NavigableSet<ClusterNode> allVisibleNodes() { + NavigableSet<ClusterNode> allNodes = new TreeSet<>(); + + for (TcpDiscoveryNode node : rmtNodes.values()) { + if (node.visible()) + allNodes.add(node); + } + + allNodes.add(locNode); + + return allNodes; + } + + /** * @param addr Address. * @return Remote node ID. * @throws IOException In case of I/O error. @@ -932,9 +995,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * Message worker. */ private class MessageWorker extends IgniteSpiThread { - /** Topology history. */ - private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); - /** Message queue. */ private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>(); @@ -1042,7 +1102,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp reconnector.cancel(); reconnector.join(); - notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allNodes()); + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); } } else { @@ -1113,7 +1173,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp else if (msg instanceof TcpDiscoveryClientPingResponse) processClientPingResponse((TcpDiscoveryClientPingResponse)msg); else if (msg instanceof TcpDiscoveryPingRequest) - processPingRequest((TcpDiscoveryPingRequest)msg); + processPingRequest(); stats.onMessageProcessingFinished(msg); } @@ -1223,7 +1283,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (locNodeVer.equals(node.version())) node.version(locNodeVer); - Collection<ClusterNode> top = updateTopologyHistory(topVer); + NavigableSet<ClusterNode> top = updateTopologyHistory(topVer); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1261,7 +1321,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp return; } - Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1303,7 +1363,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp return; } - Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); + NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion()); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1399,7 +1459,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp try { Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); - notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj); + notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -1423,10 +1483,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** * Router want to ping this client. - * - * @param msg Message. */ - private void processPingRequest(TcpDiscoveryPingRequest msg) { + private void processPingRequest() { sockWriter.sendMessage(new TcpDiscoveryPingResponse(getLocalNodeId())); } @@ -1453,60 +1511,19 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp node.lastUpdateTime(tstamp); - notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); + notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes()); } else if (log.isDebugEnabled()) log.debug("Received metrics from unknown node: " + nodeId); } /** - * @param topVer New topology version. - * @return Latest topology snapshot. - */ - private Collection<ClusterNode> updateTopologyHistory(long topVer) { - TcpClientDiscoverySpi.this.topVer = topVer; - - Collection<ClusterNode> allNodes = allNodes(); - - if (!topHist.containsKey(topVer)) { - assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; - - topHist.put(topVer, allNodes); - - if (topHist.size() > topHistSize) - topHist.pollFirstEntry(); - - assert topHist.lastKey() == topVer; - assert topHist.size() <= topHistSize; - } - - return allNodes; - } - - /** - * @return All nodes. - */ - private Collection<ClusterNode> allNodes() { - Collection<ClusterNode> allNodes = new TreeSet<>(); - - for (TcpDiscoveryNode node : rmtNodes.values()) { - if (node.visible()) - allNodes.add(node); - } - - allNodes.add(locNode); - - return allNodes; - } - - /** * @param type Event type. * @param topVer Topology version. * @param node Node. * @param top Topology snapshot. */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) { + private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) { notifyDiscovery(type, topVer, node, top, null); } @@ -1516,7 +1533,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param node Node. * @param top Topology snapshot. */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top, + private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top, @Nullable Serializable data) { DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index e866504..e9eaa1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -32,7 +32,7 @@ import java.util.concurrent.locks.*; */ public class TcpDiscoveryNodesRing { /** Visible nodes filter. */ - private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { + public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { @Override public boolean apply(TcpDiscoveryNode node) { return node.visible(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 1268a23..119fc53 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -518,6 +518,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { }; G.addListener(lsnr); + final TcpClientDiscoverySpi client2Disco = (TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); + try { failServer(2); @@ -531,6 +533,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { finally { G.removeListener(lsnr); } + + assert client2Disco.getRemoteNodes().isEmpty(); } /**