Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 9319206bd -> c453ab8dc
ignite-752: removed heartbeats sender and status checker threads Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c453ab8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c453ab8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c453ab8d Branch: refs/heads/ignite-752 Commit: c453ab8dc19327ac8b7df8350bd65ed0e237c3ac Parents: 9319206 Author: Denis Magda <dma...@gridgain.com> Authored: Tue Jul 21 17:35:23 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Tue Jul 21 17:35:23 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 423 +++++-------------- ...TcpDiscoverySpiFailureThresholdSelfTest.java | 5 +- 2 files changed, 114 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 02e13ba..2a09c62 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** Client message workers. */ protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; - - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; - /** IP finder cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IpFinderCleaner ipFinderCleaner; @@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onJoinFinished(); - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); - - chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender(); - chkStatusSnd.start(); - if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); ipFinderCleaner.start(); @@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tmp); U.joinThreads(tmp, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -624,8 +604,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNode != null) locNode.lastDataReceivedTime(U.currentTimeMillis()); - if (chkStatusSnd != null) - chkStatusSnd.onDataReceived(); + if (msgWorker != null) + // Node receives messages from remote nodes, reset this flag. + msgWorker.failureDetectionNotified = false; } } @@ -1291,12 +1272,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tcpSrvr); U.join(tcpSrvr, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -1386,12 +1361,6 @@ class ServerImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - if (spi.failureDetectionThresholdEnabled()) - b.append(" Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - else - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - - b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); @@ -1475,261 +1444,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Thread that sends heartbeats. - */ - private class HeartbeatsSender extends IgniteSpiThread { - /** - * Constructor. - */ - private HeartbeatsSender() { - super(spi.ignite().name(), "tcp-disco-hb-sender", log); - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - while (!isLocalNodeCoordinator()) - Thread.sleep(1000); - - if (log.isDebugEnabled()) - log.debug("Heartbeats sender has been started."); - - UUID nodeId = getConfiguredNodeId(); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); - - return; - } - - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId); - - msg.verify(getLocalNodeId()); - - msgWorker.addMessage(msg); - - Thread.sleep(spi.hbFreq); - } - } - } - - /** - * Thread that sends status check messages to next node if local node has not - * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) - * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * - * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. - */ - private class CheckStatusSender extends IgniteSpiThread { - /** Heartbeats check timeout. */ - protected long hbCheckTimeout; - - /** Time when the last status message has been sent. */ - protected long lastTimeStatusMsgSent; - - /** - * Constructor. - */ - private CheckStatusSender() { - super(spi.ignite().name(), "tcp-disco-status-check-sender", log); - - init(); - } - - /** - * Constructor. - * - * @param threadName Name of the thread. - */ - private CheckStatusSender(String threadName) { - super(spi.ignite().name(), threadName, log); - - init(); - } - - /** - * Inits {@code CheckStatusSender}. - */ - private void init() { - // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. - hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Status check sender has been started."); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping status check sender (SPI is not connected to topology)."); - - return; - } - - long hbTimeout = checkHeartbeats(); - - assert hbTimeout > 0; - - if (hbTimeout > 0) - Thread.sleep(hbTimeout); - } - } - - /** - * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than - * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring. - * - * @return Timeout to wait before calling this function again. - */ - protected long checkHeartbeats() { - // 1. Determine timeout. - if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) - lastTimeStatusMsgSent = locNode.lastUpdateTime(); - - long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis(); - - // 2. Still need to wait before sending? - if (timeout > 0) - return timeout; - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); - - // 3. Send status check message. - lastTimeStatusMsgSent = U.currentTimeMillis(); - - return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis(); - } - - /** - * Called when a chunck of data is received from a remote node. - */ - protected void onDataReceived() { - // No-op - } - - /** - * Signals that a message added to the messages queue by this thread has been processed. - */ - protected void messageProcessed() { - throw new UnsupportedOperationException(); - } - } - - /** - * Thread performs the following two tasks: - * <ul> - * <li> - * Send connection check message to the next node with {@link TcpDiscoverySpi#connCheckFreq} frequency; - * </li> - * <li> - * Sends status check messages to next node if local node has not - * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) - * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * - * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. - * </li> - * </ul> - */ - private class CheckConnectionWorker extends CheckStatusSender { - /** */ - private volatile boolean msgInQueue; - - /** */ - private volatile boolean logMsgPrinted; - - /** Time when the last status message has been sent. */ - private long lastTimeConnCheckMsgSent; - - /** - * Constructor - */ - public CheckConnectionWorker() { - super("tcp-disco-conn-check-worker"); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Connection check worker has been started."); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping connection check worker (SPI is not connected to topology)."); - - return; - } - - long hbTimeout = checkHeartbeats(); - - assert hbTimeout > 0; - - long connTimeout = checkConnection(); - - assert connTimeout > 0; - - Thread.sleep(Math.min(hbTimeout, connTimeout)); - } - } - - /** - * Check connection aliveness status. - * - * @return Timeout to wait before caliing this method the next time. - */ - private long checkConnection() { - if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() && - ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { - - if (!logMsgPrinted) { - log.info("Local node seems to be disconnected from topology (failure detection threshold " + - "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + - ", connCheckFreq=" + spi.connCheckFreq + ']'); - - logMsgPrinted = true; - } - } - - if (msgInQueue) - return spi.connCheckFreq; - - long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis(); - - if (timeout > 0) - return timeout; - - if (ring.hasRemoteNodes()) { - // Send the message using ring message worker in order to reuse an existed socket to the next node. - msgInQueue = true; - - msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); - - lastTimeConnCheckMsgSent = U.currentTimeMillis(); - } - - return spi.connCheckFreq; - } - - - /** {@inheritDoc} */ - @Override protected void onDataReceived() { - logMsgPrinted = false; - } - - /** {@inheritDoc} */ - @Override protected void messageProcessed() { - msgInQueue = false; - } - } - - /** * Thread that cleans IP finder and keeps it in the correct state, unregistering * addresses of the nodes that has left the topology. * <p> @@ -2051,6 +1765,21 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** Last time status message has been sent. */ + private long lastTimeStatusMsgSent; + + /** Incoming heartbeats check frequency. */ + private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50; + + /** Time when the last status message has been sent. */ + private long lastTimeConnCheckMsgSent; + + /** Whether an error message has been printed out when failure detection threshold is reached. */ + private volatile boolean failureDetectionNotified; + + /** Last time hearbeat message has been sent. */ + private long lastTimeHbMsgSent; + /** */ protected RingMessageWorker() { @@ -2117,6 +1846,15 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingFinished(msg); } + /** {@inheritDoc} */ + @Override protected void noMessageLoop() { + checkConnection(); + + sendHeartbeatMessage(); + + checkHeartbeatsReceiving(); + } + /** * Sends message across the ring. * @@ -4039,7 +3777,6 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Connection check message discarded (local node is leaving topology)."); - chkStatusSnd.messageProcessed(); return; } @@ -4047,27 +3784,19 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Connection check message discarded (no next node in topology)."); - chkStatusSnd.messageProcessed(); return; } - try { - // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'. - if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, - TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, - TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { - // Preserve backward compatibility with nodes of older versions. - TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null); - stMsg.replacedConnCheckMsg(true); - - processStatusCheckMessage(stMsg); - } - else if (ring.hasRemoteNodes()) - sendMessageAcrossRing(msg); - } - finally { - chkStatusSnd.messageProcessed(); - } + // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'. + if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { + // Preserve backward compatibility with nodes of older versions. + TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null); + stMsg.replacedConnCheckMsg(true); + + processStatusCheckMessage(stMsg); + } else if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); } /** @@ -4272,6 +4001,71 @@ class ServerImpl extends TcpDiscoveryImpl { } } } + + /** + * Sends heartbeat message if needed. + */ + private void sendHeartbeatMessage() { + if (!isLocalNodeCoordinator()) + return; + + long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); + + msg.verify(getLocalNodeId()); + + msgWorker.addMessage(msg); + + lastTimeHbMsgSent = U.currentTimeMillis(); + } + + /** + * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than + * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring. + */ + private void checkHeartbeatsReceiving() { + if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) + lastTimeStatusMsgSent = locNode.lastUpdateTime(); + + long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + + lastTimeStatusMsgSent = U.currentTimeMillis(); + } + + /** + * Check connection aliveness status. + */ + private void checkConnection() { + if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime() + >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + + log.info("Local node seems to be disconnected from topology (failure detection threshold " + + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + + ", connCheckFreq=" + spi.connCheckFreq + ']'); + + failureDetectionNotified = true; + } + + long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + if (ring.hasRemoteNodes()) { + processConnectionCheckMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); + + lastTimeConnCheckMsgSent = U.currentTimeMillis(); + } + } } /** @@ -5219,10 +5013,12 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS); - if (msg == null) + if (msg == null) { + noMessageLoop(); continue; + } processMessage(msg); } @@ -5268,6 +5064,13 @@ class ServerImpl extends TcpDiscoveryImpl { protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); /** + * Called when there is no message to process giving ability to perform other activity. + */ + protected void noMessageLoop() { + // No-op. + } + + /** * @param sock Socket. * @param msg Message. * @param timeout Socket timeout. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java index 4fdf886..362be15 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java @@ -37,7 +37,7 @@ import java.net.*; */ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest { /** */ - private static final int SPI_COUNT = 7; + private static final int SPI_COUNT = 6; /** */ private static final long CONN_CHECK_FREQ = 2000; @@ -76,9 +76,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe case 5: spi.setMaxAckTimeout(10000); break; - case 6: - spi.setNetworkTimeout(4000); - break; default: assert false; }