ignite-890: fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4201077f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4201077f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4201077f Branch: refs/heads/ignite-884 Commit: 4201077f85ee814c812c0cf8e3848ec0ffcad563 Parents: dda7727 Author: Denis Magda <dma...@gridgain.com> Authored: Tue Jul 14 13:53:06 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Tue Jul 14 13:53:06 2015 +0300 ---------------------------------------------------------------------- .../managers/discovery/GridDiscoveryManager.java | 4 ++-- .../spi/communication/tcp/TcpCommunicationSpi.java | 14 +++++--------- .../apache/ignite/spi/discovery/tcp/ClientImpl.java | 16 +++++++++------- .../apache/ignite/spi/discovery/tcp/ServerImpl.java | 16 +++++++++++----- .../discovery/tcp/internal/TcpDiscoveryNode.java | 6 ++++++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 2 +- 6 files changed, 34 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index eae07ed..80f7f98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1589,7 +1589,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { new Runnable() { @Override public void run() { ctx.markSegmented(); - + System.out.println("Stopping grid on segmentation: " + ctx.gridName()); G.stop(ctx.gridName(), true); } } @@ -1896,7 +1896,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { segmented = true; if (!isLocDaemon) - U.warn(log, "Local node SEGMENTED: " + node); + U.warn(log, "Local node SEGMENTED: " + node + ", remote_nodes = " + getSpi().getRemoteNodes()); else if (log.isDebugEnabled()) log.debug("Local node SEGMENTED: " + node); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index dcc7037..423f4ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1728,8 +1728,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client = clients.get(nodeId); if (client == null) { - //if (isNodeStopping()) - // throw new IgniteSpiException("Node is stopping."); + if (isNodeStopping()) + throw new IgniteSpiException("Node is stopping."); // Do not allow concurrent connects. GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); @@ -1899,8 +1899,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } catch (IgniteCheckedException | RuntimeException | Error e) { - if (!getSpiContext().localNode().isClient() && node.isClient()) - getSpiContext().tryFailNode(node.id(), "Killing client"); + //tryFailClient(node, e); if (log.isDebugEnabled()) log.debug( @@ -2100,10 +2099,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } catch (Exception e) { - if (X.hasCause(e, HandshakeFailureException.class) && node.isClient() && - !getSpiContext().isStopping()) - getSpiContext().tryFailNode(node.id(), "Killing client: " + e.getMessage()); - if (client != null) { client.forceClose(); @@ -2146,7 +2141,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (client == null) { assert errs != null; - if (X.hasCause(errs, ConnectException.class)) +// if (!tryFailClient(node, errs) && X.hasCause(errs, ConnectException.class)) + if (X.hasCause(errs, ConnectException.class)) LT.warn(log, null, "Failed to connect to a remote node " + "(make sure that destination node is alive and " + "operating system firewall is disabled on local and remote hosts) " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index bb8d52d..467acc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -432,8 +432,10 @@ class ClientImpl extends TcpDiscoveryImpl { } if (addrs.isEmpty()) { - if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) + if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) { + System.out.println("Client reconnect timeout: " + getLocalNodeId()); return null; + } U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + "in 2000ms): " + addrs0); @@ -595,11 +597,9 @@ class ClientImpl extends TcpDiscoveryImpl { NavigableSet<ClusterNode> allNodes = allVisibleNodes(); if (!topHist.containsKey(topVer)) { - assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) + - ", newVer=" + topVer + - ", locNode=" + locNode + - ", msg=" + msg; + if (!topHist.isEmpty() && topHist.lastKey() != topVer - 1) + log.warning("Missing particular topology version [lastVer=" + (topHist.isEmpty() ? null : + topHist.lastKey()) + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg); topHist.put(topVer, allNodes); @@ -796,7 +796,7 @@ class ClientImpl extends TcpDiscoveryImpl { catch (IOException e) { msgWorker.addMessage(new SocketClosedMessage(sock)); - if (log.isDebugEnabled()) + //if (log.isDebugEnabled()) U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e); } finally { @@ -1151,6 +1151,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (((SocketClosedMessage)msg).sock == currSock) { currSock = null; + System.out.println("Socket closed. Join latch: " + joinLatch.getCount() + ". Node: " + getLocalNodeId()); boolean join = joinLatch.getCount() > 0; if (spi.getSpiContext().isStopping() || segmented) { @@ -1165,6 +1166,7 @@ class ClientImpl extends TcpDiscoveryImpl { else { assert reconnector == null; + System.out.println("Starting reconnector: " + getLocalNodeId()); final Reconnector reconnector = new Reconnector(join); this.reconnector = reconnector; reconnector.start(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 33abe55..cda026f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1710,8 +1710,6 @@ class ServerImpl extends TcpDiscoveryImpl { if (res != null && msg.verified()) res.add(prepare(msg, node.id())); - else - log.info("(1) Skipping message with [topVer=" + msg.topologyVersion() + ", msg=" + msg + ']'); } if (log.isDebugEnabled()) { @@ -1738,8 +1736,6 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg.verified()) cp.add(prepare(msg, node.id())); - else - log.info("(2) Skipping message with [topVer=" + msg.topologyVersion() + ", msg=" + msg + ']'); } cp = !skip ? cp : null; @@ -3769,9 +3765,19 @@ class ServerImpl extends TcpDiscoveryImpl { if (clientNodeIds.contains(clientNode.id())) clientNode.aliveCheck(spi.maxMissedClientHbs); else { + if (!isLocalNodeCoordinator()) + continue; + + if (clientNode.aliveCheck() == 0) + // Node can just became coordinator without receiving any client heartbeat. + clientNode.aliveCheck(spi.maxMissedClientHbs); + int aliveCheck = clientNode.decrementAliveCheck(); - if (aliveCheck == 0 && isLocalNodeCoordinator()) { + if (aliveCheck == 0) { + // Make aliveCheck negative + clientNode.decrementAliveCheck(); + processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, clientNode.id(), clientNode.internalOrder())); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 22f56c3..46844ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -427,6 +427,12 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.aliveCheck = aliveCheck; } + public int aliveCheck() { + assert isClient(); + + return aliveCheck; + } + /** * @return Client router node ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4201077f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 61fe95d..4e5c68e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -40,7 +40,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { private static final int GRID_CNT = 5; /** */ - private static final int CLIENT_GRID_CNT = 5; + private static final int CLIENT_GRID_CNT = 1; /** */ private static final ThreadLocal<Boolean> clientFlagPerThread = new ThreadLocal<>();