Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 347eb70c8 -> 9878f4059
ignite-752: implemented connection check message Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9878f405 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9878f405 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9878f405 Branch: refs/heads/ignite-752 Commit: 9878f40599f327c819aa7fe6f45f2c20b8895108 Parents: 347eb70 Author: Denis Magda <dma...@gridgain.com> Authored: Fri Jul 17 22:10:45 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Fri Jul 17 22:10:45 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 52 ++++++-------------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 3 -- .../TcpDiscoveryConnectionCheckMessage.java | 3 -- 3 files changed, 16 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9878f405/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 74c7dbd..9e9921b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -235,14 +235,11 @@ class ServerImpl extends TcpDiscoveryImpl { hbsSnd = new HeartbeatsSender(); hbsSnd.start(); - if (spi.failureDetectionThresholdEnabled()) { - chkConnWorker = new CheckConnectionWorker(); - chkConnWorker.start(); - } - else { - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); - } + chkConnWorker = new CheckConnectionWorker(); + chkConnWorker.start(); + + chkStatusSnd = new CheckStatusSender(); + chkStatusSnd.start(); if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); @@ -332,14 +329,11 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(hbsSnd); U.join(hbsSnd, log); - if (spi.failureDetectionThresholdEnabled()) { - U.interrupt(chkConnWorker); - U.join(chkConnWorker, log); - } - else { - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - } + U.interrupt(chkConnWorker); + U.join(chkConnWorker, log); + + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -2070,10 +2064,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Sends message across the ring. * * @param msg Message to send - * @return Response code. */ @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) - private int sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { assert msg != null; assert ring.hasRemoteNodes(); @@ -2122,8 +2115,6 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - int msgRes = RES_FAIL; - while (true) { if (searchNext) { TcpDiscoveryNode newNext = ring.nextNode(failedNodes); @@ -2411,17 +2402,17 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - msgRes = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + - ", res=" + msgRes + ']'); + ", res=" + res + ']'); if (debugMode) debugLog("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + - ", res=" + msgRes + ']'); + ", res=" + res + ']'); } finally { clearNodeAddedMessage(msg); @@ -2548,8 +2539,6 @@ class ServerImpl extends TcpDiscoveryImpl { "To speed up failure detection please see 'Failure Detection' section under javadoc" + " for 'TcpDiscoverySpi'"); } - - return msgRes; } /** @@ -3990,18 +3979,10 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - int res = RES_FAIL; - if (ring.hasRemoteNodes()) - res = sendMessageAcrossRing(msg); + sendMessageAcrossRing(msg); chkConnWorker.messageProcessed(); - - if (res == TcpDiscoveryConnectionCheckMessage.STATUS_RECON) { - U.warn(log, "Node is out of topology (probably, due to short-time network problems)."); - - notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode); - } } /** @@ -4579,8 +4560,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } else if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - spi.writeToSocket(msg, sock, ring.node(msg.creatorNodeId()) != null ? RES_OK : - TcpDiscoveryConnectionCheckMessage.STATUS_RECON, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); continue; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9878f405/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 85b1f38..c1cf9ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,9 +37,6 @@ abstract class TcpDiscoveryImpl { /** Response OK. */ protected static final int RES_OK = 1; - /** Response FAIL. */ - protected static final int RES_FAIL = -1; - /** Response CONTINUE JOIN. */ protected static final int RES_CONTINUE_JOIN = 100; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9878f405/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java index 3249220..54c9761 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -26,9 +26,6 @@ import org.apache.ignite.spi.discovery.tcp.internal.*; * which directly replies to the sender without message re-translation to the coordinator. */ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage { - /** Status RECONNECT. */ - public static final int STATUS_RECON = 500; - /** * Constructor. *