ignite-752: reduced size of connection check message, automatic connection check frequency calculation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a0edbbc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a0edbbc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a0edbbc0 Branch: refs/heads/ignite-752 Commit: a0edbbc0a800748633cd3f92d4537a04e8bbc2ba Parents: 27b426b Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 23 10:36:57 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jul 23 10:36:57 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 44 +++++++++++++++----- .../spi/discovery/tcp/TcpDiscoverySpi.java | 31 -------------- .../TcpDiscoveryConnectionCheckMessage.java | 21 +++++++++- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 2 +- ...entDiscoverySpiFailureThresholdSelfTest.java | 38 ++++++++++++++++- .../tcp/TcpDiscoverySpiConfigSelfTest.java | 3 -- ...TcpDiscoverySpiFailureThresholdSelfTest.java | 9 +--- 8 files changed, 94 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 5ed5a6c..6fc5893 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -191,7 +191,7 @@ public class IgniteConfiguration { public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; /** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */ - public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 12_000; + public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000; /** Optional grid name. */ private String gridName; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 56472aa..b085b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1774,19 +1774,43 @@ class ServerImpl extends TcpDiscoveryImpl { /** Incoming heartbeats check frequency. */ private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50; + /** Last time heartbeat message has been sent. */ + private long lastTimeHbMsgSent; + /** Time when the last status message has been sent. */ private long lastTimeConnCheckMsgSent; /** Flag that keeps info on whether the threshold is reached or not. */ private boolean failureThresholdReached; - /** Last time hearbeat message has been sent. */ - private long lastTimeHbMsgSent; + /** Connection check frequency. */ + private long connCheckFreq; /** */ protected RingMessageWorker() { super("tcp-disco-msg-worker", 10); + + initConnectionCheckFrequency(); + } + + /** + * Initializes connection check frequency. Used only when failure detection threshold is enabled. + */ + private void initConnectionCheckFrequency() { + if (spi.failureDetectionThresholdEnabled()) { + for (int i = 3; i > 0; i--) { + connCheckFreq = spi.failureDetectionThreshold() / i; + + if (connCheckFreq > 0) + break; + } + + assert connCheckFreq > 0; + + if (log.isDebugEnabled()) + log.debug("Connection check frequency is calculated: " + connCheckFreq); + } } /** @@ -4054,12 +4078,12 @@ class ServerImpl extends TcpDiscoveryImpl { log.info("Local node seems to be disconnected from topology (failure detection threshold " + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + - ", connCheckFreq=" + spi.connCheckFreq + ']'); + ", connCheckFreq=" + connCheckFreq + ']'); failureThresholdReached = true; } - long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis(); + long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis(); if (elapsed > 0) return; @@ -4423,7 +4447,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode && recordable(msg)) debugLog("Message has been received: " + msg); - if (msg instanceof TcpDiscoveryJoinRequestMessage) { + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + + continue; + } + else if (msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg; if (!req.responded()) { @@ -4436,11 +4465,6 @@ class ServerImpl extends TcpDiscoveryImpl { break; } } - else if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); - - continue; - } else if (msg instanceof TcpDiscoveryClientReconnectMessage) { if (clientMsgWrk != null) { TcpDiscoverySpiState state = spiStateCopy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index d754dab..be042eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -217,9 +217,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */ public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; - /** Default connection check frequency. */ - public static final int DFLT_CONN_CHECK_FREQ = 2000; - /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; @@ -260,9 +257,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Size of topology snapshots history. */ protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - /** Connection check frequency. Used in conjunction with failure detection threshold. */ - protected long connCheckFreq = DFLT_CONN_CHECK_FREQ; - /** Grid discovery listener. */ protected volatile DiscoverySpiListener lsnr; @@ -855,29 +849,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** - * Sets connection check frequency. Used in conjunction with {@link IgniteConfiguration#failureDetectionThreshold}. - * <p> - * A node sends connection check messages to its next node in the topology with this frequency to check its - * connection status and quickly process a network related error if any. - * <p> - * The way to check connection aliveness with connection check messages is much cheaper than to use heartbeat - * messages. The reason is that a connection check message is only processed by the next node in a topology, - * while a heartbeat message is translated twice across the ring. - * <p> - * Affects server nodes only. - * - * @param connCheckFreq Frequency in milliseconds. - * @return Tcp discovery spi. - * @see IgniteConfiguration#setFailureDetectionThreshold(long) - */ - @IgniteSpiConfiguration(optional = true) - public TcpDiscoverySpi setConnectionCheckFrequency(long connCheckFreq) { - this.connCheckFreq = connCheckFreq; - - return this; - } - - /** * @return Size of topology snapshots history. */ public long getTopHistorySize() { @@ -1674,8 +1645,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); assertParameter(reconCnt > 0, "reconnectCnt > 0"); } - else - assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq"); assertParameter(netTimeout > 0, "networkTimeout > 0"); assertParameter(ipFinder != null, "ipFinder != null"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java index 046e2b5..9c8d7cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -20,16 +20,25 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.internal.*; +import java.io.*; + /** * Message used to check whether a node is still connected to the topology. * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node * which directly replies to the sender without message re-translation to the coordinator. */ -public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable { /** */ private static final long serialVersionUID = 0L; /** + * Default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryConnectionCheckMessage() { + // No-op. + } + + /** * Constructor. * * @param creatorNode Node created this message. @@ -42,4 +51,14 @@ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMess @Override public String toString() { return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString()); } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index ccb9717..514f784 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -166,7 +166,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS @Override public boolean apply() { return recoveryDesc.messagesFutures().isEmpty(); } - }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 5000 : + }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 7000 : 10_000); assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java index 939286d..4c7dbe8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java @@ -34,6 +34,12 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc private final static int FAILURE_AWAIT_TIME = 7_000; /** */ + private final static long FAILURE_THRESHOLD = 10_000; + + /** */ + private static long failureThreshold = FAILURE_THRESHOLD; + + /** */ private static boolean useTestSpi; /** {@inheritDoc} */ @@ -43,7 +49,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc /** {@inheritDoc} */ @Override protected long failureDetectionThreshold() { - return useTestSpi ? 5000 : 10_000; + return failureThreshold; } /** {@inheritDoc} */ @@ -86,7 +92,35 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc /** * @throws Exception in case of error. */ - public void testFailureThresholdWorkability() throws Exception { + public void testFailureThresholdWorkabilityAvgTimeout() throws Exception { + failureThreshold = 3000; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + public void testFailureThresholdWorkabilitySmallTimeout() throws Exception { + failureThreshold = 500; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + private void checkFailureThresholdWorkability() throws Exception { useTestSpi = true; TestTcpDiscoverySpi firstSpi = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java index 91f4f9e..8ab2116 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java @@ -42,8 +42,5 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1); checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0); checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0); - checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0); - checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", - IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java index 1ee839c..63e79c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java @@ -40,9 +40,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe private static final int SPI_COUNT = 6; /** */ - private static final long CONN_CHECK_FREQ = 2000; - - /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ @@ -59,8 +56,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe switch (idx) { case 0: - spi.setConnectionCheckFrequency(CONN_CHECK_FREQ); - break; case 1: // Ignore break; @@ -180,7 +175,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe firstSpi().countConnCheckMsg = true; nextSpi.countConnCheckMsg = true; - Thread.sleep(CONN_CHECK_FREQ * 5); + Thread.sleep(firstSpi().failureDetectionThreshold()); firstSpi().countConnCheckMsg = false; nextSpi.countConnCheckMsg = false; @@ -237,7 +232,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe firstSpi().countConnCheckMsg = true; nextSpi.countConnCheckMsg = true; - Thread.sleep(CONN_CHECK_FREQ * 5); + Thread.sleep(firstSpi().failureDetectionThreshold() / 2); firstSpi().countConnCheckMsg = false; nextSpi.countConnCheckMsg = false;