ignite-752: optimized connection check message impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0cc31b27 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0cc31b27 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0cc31b27 Branch: refs/heads/ignite-752 Commit: 0cc31b27d449e18251e1d681a401f21365f8e529 Parents: 9878f40 Author: Denis Magda <dma...@gridgain.com> Authored: Sun Jul 19 09:54:13 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Sun Jul 19 09:54:13 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 248 ++++++++++++------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +- .../TcpDiscoveryStatusCheckMessage.java | 19 ++ 4 files changed, 179 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 9e9921b..f05d027 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -88,9 +88,6 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private CheckStatusSender chkStatusSnd; - /** Connection checker. */ - private CheckConnectionWorker chkConnWorker; - /** IP finder cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IpFinderCleaner ipFinderCleaner; @@ -235,10 +232,7 @@ class ServerImpl extends TcpDiscoveryImpl { hbsSnd = new HeartbeatsSender(); hbsSnd.start(); - chkConnWorker = new CheckConnectionWorker(); - chkConnWorker.start(); - - chkStatusSnd = new CheckStatusSender(); + chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender(); chkStatusSnd.start(); if (spi.ipFinder.isShared()) { @@ -329,9 +323,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(hbsSnd); U.join(hbsSnd, log); - U.interrupt(chkConnWorker); - U.join(chkConnWorker, log); - U.interrupt(chkStatusSnd); U.join(chkStatusSnd, log); @@ -629,10 +620,10 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override protected void onDataRead() { + @Override protected void onDataReceived() { if (spi.failureDetectionThresholdEnabled()) { locNode.lastDataReceivedTime(U.currentTimeMillis()); - chkConnWorker.reset(); + chkStatusSnd.onDataReceived(); } } @@ -1304,14 +1295,8 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(hbsSnd); U.join(hbsSnd, log); - if (spi.failureDetectionThresholdEnabled()) { - U.interrupt(chkConnWorker); - U.join(chkConnWorker, log); - } - else { - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - } + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -1403,7 +1388,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); if (spi.failureDetectionThresholdEnabled()) - b.append(" Check connectino worker: ").append(threadStatus(chkConnWorker)).append(U.nl()); + b.append(" Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl()); else b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); @@ -1540,12 +1525,39 @@ class ServerImpl extends TcpDiscoveryImpl { * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. */ private class CheckStatusSender extends IgniteSpiThread { + /** Heartbeats check timeout. */ + protected long hbCheckTimeout; + + /** Time when the last status message has been sent. */ + protected long lastTimeStatusMsgSent; + /** * Constructor. */ private CheckStatusSender() { super(spi.ignite().name(), "tcp-disco-status-check-sender", log); + init(); + } + + /** + * Constructor. + * + * @param threadName Name of the thread. + */ + private CheckStatusSender(String threadName) { + super(spi.ignite().name(), threadName, log); + + init(); + } + + /** + * Inits {@code CheckStatusSender}. + */ + private void init() { + // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. + hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; + setPriority(spi.threadPri); } @@ -1555,22 +1567,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Status check sender has been started."); - // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. - long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; - - long lastSent = 0; - while (!isInterrupted()) { - // 1. Determine timeout. - if (lastSent < locNode.lastUpdateTime()) - lastSent = locNode.lastUpdateTime(); - - long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); - - if (timeout > 0) - Thread.sleep(timeout); - - // 2. Check if SPI is still connected. if (spiStateCopy() != CONNECTED) { if (log.isDebugEnabled()) log.debug("Stopping status check sender (SPI is not connected to topology)."); @@ -1578,41 +1575,73 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - // 3. Was there an update? - if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { - if (log.isDebugEnabled()) - log.debug("Skipping status check send " + - "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + - ", hasRmts=" + ring.hasRemoteNodes() + ']'); + long hbTimeout = checkHeartbeats(); - continue; - } - - // 4. Send status check message. - lastSent = U.currentTimeMillis(); + assert hbTimeout > 0; - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + if (hbTimeout > 0) + Thread.sleep(hbTimeout); } } + + /** + * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than + * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring. + * + * @return Timeout to wait before calling this function again. + */ + protected long checkHeartbeats() { + // 1. Determine timeout. + if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) + lastTimeStatusMsgSent = locNode.lastUpdateTime(); + + long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis(); + + // 2. Still need to wait before sending? + if (timeout > 0) + return timeout; + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + + // 3. Send status check message. + lastTimeStatusMsgSent = U.currentTimeMillis(); + + return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis(); + } + + /** + * Called when a chunck of data is received from a remote node. + */ + protected void onDataReceived() { + // No-op + } + + /** + * Signals that a message added to the messages queue by this thread has been processed. + */ + protected void messageProcessed() { + throw new UnsupportedOperationException(); + } } /** * TODO: IGNITE-752 */ - private class CheckConnectionWorker extends IgniteSpiThread { + private class CheckConnectionWorker extends CheckStatusSender { /** */ private volatile boolean msgInQueue; /** */ private volatile boolean logMsgPrinted; + /** Time when the last status message has been sent. */ + private long lastTimeConnCheckMsgSent; + /** * Constructor */ public CheckConnectionWorker() { - super(spi.ignite().name(), "tcp-disco-conn-check-worker", log); - - setPriority(spi.threadPri); + super("tcp-disco-conn-check-worker"); } /** {@inheritDoc} */ @@ -1628,46 +1657,63 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() && - ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + long hbTimeout = checkHeartbeats(); - if (!logMsgPrinted) { - log.info("Local node seems to be disconnected from topology (failure detection threshold " + - "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + - ", connCheckFreq=" + spi.connCheckFreq + ']'); + assert hbTimeout > 0; - logMsgPrinted = true; - } - } + long connTimeout = checkConnection(); - if (msgInQueue) { - Thread.sleep(spi.connCheckFreq); + assert connTimeout > 0; - continue; - } + Thread.sleep(Math.min(hbTimeout, connTimeout)); + } + } - if (ring.hasRemoteNodes()) { - // Send the message using ring message worker in order to reuse an existed socket to the next node. - msgInQueue = true; + /** + * TODO: IGNITE-752 + * @return + */ + private long checkConnection() { + if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() && + ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + + if (!logMsgPrinted) { + log.info("Local node seems to be disconnected from topology (failure detection threshold " + + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + + ", connCheckFreq=" + spi.connCheckFreq + ']'); - msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); + logMsgPrinted = true; } + } + + if (msgInQueue) + return spi.connCheckFreq; + + long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis(); - Thread.sleep(spi.connCheckFreq); + if (timeout > 0) + return timeout; + + if (ring.hasRemoteNodes()) { + // Send the message using ring message worker in order to reuse an existed socket to the next node. + msgInQueue = true; + + msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); + + lastTimeConnCheckMsgSent = U.currentTimeMillis(); } + + return spi.connCheckFreq; } - /** - * TODO: IGNITE-752 - */ - private void reset() { + + /** {@inheritDoc} */ + @Override protected void onDataReceived() { logMsgPrinted = false; } - /** - * TODO: IGNITE-752 - */ - private void messageProcessed() { + /** {@inheritDoc} */ + protected void messageProcessed() { msgInQueue = false; } } @@ -2129,6 +2175,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (ring.hasRemoteNodes()) { msg.senderNodeId(locNodeId); + if (msg instanceof TcpDiscoveryConnectionCheckMessage || + (msg instanceof TcpDiscoveryStatusCheckMessage && + ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg())) + break; + addMessage(msg); } @@ -2380,16 +2431,6 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (msg instanceof TcpDiscoveryConnectionCheckMessage && next.version().greaterThanEqual( - TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, - TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, - TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { - // Preserve backward compatibility with nodes of older versions. - assert msg.creatorNodeId().equals(getLocalNodeId()); - - msg = new TcpDiscoveryStatusCheckMessage(locNode, null); - } - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); try { @@ -3973,16 +4014,37 @@ class ServerImpl extends TcpDiscoveryImpl { if (spiStateCopy() != CONNECTED) { if (log.isDebugEnabled()) - log.debug("Connection check message discarded (local node receives updates)."); + log.debug("Connection check message discarded (local node is leaving topology)."); - chkConnWorker.messageProcessed(); + chkStatusSnd.messageProcessed(); return; } - if (ring.hasRemoteNodes()) - sendMessageAcrossRing(msg); + if (next == null) { + if (log.isDebugEnabled()) + log.debug("Connection check message discarded (no next node in topology)."); + + chkStatusSnd.messageProcessed(); + return; + } - chkConnWorker.messageProcessed(); + try { + // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'. + if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { + // Preserve backward compatibility with nodes of older versions. + TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null); + stMsg.replacedConnCheckMsg(true); + + processStatusCheckMessage(stMsg); + } + else if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + } + finally { + chkStatusSnd.messageProcessed(); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index c1cf9ab..20d49df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -133,7 +133,7 @@ abstract class TcpDiscoveryImpl { /** * TODO: IGNITE-752 */ - protected void onDataRead() { + protected void onDataReceived() { // No-op } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index fcba8c6..f231c29 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1362,7 +1362,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); - impl.onDataRead(); + impl.onDataReceived(); return res; } @@ -1405,7 +1405,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T if (res == -1) throw new EOFException(); - impl.onDataRead(); + impl.onDataReceived(); return res; } @@ -1655,6 +1655,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); assertParameter(reconCnt > 0, "reconnectCnt > 0"); } + else + assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq"); assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java index bec7093..b2c2e07 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -49,6 +49,9 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage /** Creator node status (initialized by coordinator). */ private int status; + /** Whether this message replaced {@link TcpDiscoveryConnectionCheckMessage} to preserve backward compatibility. */ + private transient boolean replacedConnCheckMsg; + /** * Constructor. * @@ -98,6 +101,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage this.status = status; } + /** + * TODO: IGNITE-752 + * @return + */ + public boolean replacedConnCheckMsg() { + return replacedConnCheckMsg; + } + + /** + * TODO: IGNITE-752 + * @param replacedConnCheckMsg + */ + public void replacedConnCheckMsg(boolean replacedConnCheckMsg) { + this.replacedConnCheckMsg = replacedConnCheckMsg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());