ignite-752: applied review nodes proposed by Dmitriy
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31ab0dd3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31ab0dd3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31ab0dd3 Branch: refs/heads/ignite-752 Commit: 31ab0dd306552b5994f90ecbab9c480ce4a8b402 Parents: e81867b Author: dmagda <[email protected]> Authored: Fri Jul 24 14:57:00 2015 +0300 Committer: dmagda <[email protected]> Committed: Fri Jul 24 14:57:00 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 51 +-- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 54 +-- .../spi/IgniteSpiOperationTimeoutException.java | 4 +- .../spi/IgniteSpiOperationTimeoutHelper.java | 34 +- .../communication/tcp/TcpCommunicationSpi.java | 60 +-- .../ignite/spi/discovery/tcp/ClientImpl.java | 16 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 60 +-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 53 +-- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 2 +- ...tionSpiRecoveryFailureDetectionSelfTest.java | 6 +- ...unicationSpiTcpFailureDetectionSelfTest.java | 8 +- ...entDiscoverySpiFailureThresholdSelfTest.java | 205 ---------- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 205 ++++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 16 +- ...TcpDiscoverySpiFailureThresholdSelfTest.java | 400 ------------------ .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 402 +++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 4 +- 17 files changed, 791 insertions(+), 789 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index f4fc4ec..aac1754 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -190,8 +190,10 @@ public class IgniteConfiguration { /** Default value for cache sanity check enabled flag. */ public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; - /** Default failure detection threshold in millis. */ - public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000; + /** Default failure detection timeout in millis. */ + @SuppressWarnings("UnnecessaryBoxing") +// public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); + public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); /** Optional grid name. */ private String gridName; @@ -370,8 +372,8 @@ public class IgniteConfiguration { /** Port number range for time server. */ private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE; - /** Failure detection threshold. */ - private long failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD; + /** Failure detection timeout. */ + private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT; /** Property names to include into node attributes. */ private String[] includeProps; @@ -455,7 +457,7 @@ public class IgniteConfiguration { consistentId = cfg.getConsistentId(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); - failureDetectionThreshold = cfg.getFailureDetectionThreshold(); + failureDetectionTimeout = cfg.getFailureDetectionTimeout(); ggHome = cfg.getIgniteHome(); ggWork = cfg.getWorkDirectory(); gridName = cfg.getGridName(); @@ -1662,38 +1664,27 @@ public class IgniteConfiguration { } /** - * Returns failure detection threshold used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. * <p> - * Default is {@link #DFLT_FAILURE_DETECTION_THRESHOLD}. + * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}. * - * @see #setFailureDetectionThreshold(long) - * @return Failure detection threshold in milliseconds. + * @see #setFailureDetectionTimeout(long) + * @return Failure detection timeout in milliseconds. */ - public long getFailureDetectionThreshold() { - return failureDetectionThreshold; + public Long getFailureDetectionTimeout() { + return failureDetectionTimeout; } /** - * Sets failure detection threshold to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. * <p> - * If the threshold is set for a server node then it helps to detect failed nodes in a cluster topology during the - * time that is equal to threshold's value and keep working only with the nodes that are alive. - * <p> - * If it's set for a client node then the client node will be able to detect a disconnection from it's router node - * during the time equal to threshold's value. - * <p> - * The failure detection threshold is an easy and straightforward way to setup {@link TcpDiscoverySpi} and - * {@link TcpCommunicationSpi} depending on network conditions of a cluster. On the other hand if advanced setting - * of socket write, acknowledgement timeouts or other parameters is needed it can be done using specific - * {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi} APIs. However, sometimes the failure detection threshold - * will be ignored when such a timeout or parameter is set explicitly. The full list of such timeouts and - * parameters is available as a part of {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi} - * documentations. - * - * @param failureDetectionThreshold Failure detection threshold in milliseconds. - */ - public void setFailureDetectionThreshold(long failureDetectionThreshold) { - this.failureDetectionThreshold = failureDetectionThreshold; + * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before + * considering a remote connection failed. + * + * @param failureDetectionTimeout Failure detection timeout in milliseconds. + */ + public void setFailureDetectionTimeout(long failureDetectionTimeout) { + this.failureDetectionTimeout = failureDetectionTimeout; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 6f5e9e2..739891d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -75,14 +75,14 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Local node. */ private ClusterNode locNode; - /** Failure detection threshold usage switch. */ - private boolean failureDetectionThresholdEnabled = true; + /** Failure detection timeout usage switch. */ + private boolean failureDetectionTimeoutEnabled = true; /** - * Failure detection threshold. Initialized with the value of - * {@link IgniteConfiguration#getFailureDetectionThreshold()}. + * Failure detection timeout. Initialized with the value of + * {@link IgniteConfiguration#getFailureDetectionTimeout()}. */ - private long failureDetectionThreshold; + private long failureDetectionTimeout; /** * Creates new adapter and initializes it from the current (this) class. @@ -593,46 +593,52 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** - * Initiates and checks failure detection threshold value. + * Initiates and checks failure detection timeout value. */ - protected void initFailureDetectionThreshold() { - if (failureDetectionThresholdEnabled) { - failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); + protected void initFailureDetectionTimeout() { + if (failureDetectionTimeoutEnabled) { + failureDetectionTimeout = ignite.configuration().getFailureDetectionTimeout(); - if (failureDetectionThreshold <= 0) - throw new IgniteSpiException("Invalid failure detection threshold value: " + failureDetectionThreshold); - else if (failureDetectionThreshold <= 10) + if (failureDetectionTimeout <= 0) + throw new IgniteSpiException("Invalid failure detection timeout value: " + failureDetectionTimeout); + else if (failureDetectionTimeout <= 10) // Because U.currentTimeInMillis() is updated once in 10 milliseconds. - log.warning("Failure detection threshold is too low, it may lead to unpredictable behaviour " + - "[failureDetectionThreshold=" + failureDetectionThreshold + ']'); + log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " + + "[failureDetectionTimeout=" + failureDetectionTimeout + ']'); } + // Intentionally compare references using '!=' below + else if (ignite.configuration().getFailureDetectionTimeout() != + IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) + log.warning("Failure detection timeout will be ignored (one of " + getClass().getSimpleName() + + " parameters has been set explicitly"); + } /** - * Enables or disables failure detection threshold. + * Enables or disables failure detection timeout. * * @param enabled {@code true} if enable, {@code false} otherwise. */ - public void failureDetectionThresholdEnabled(boolean enabled) { - failureDetectionThresholdEnabled = enabled; + public void failureDetectionTimeoutEnabled(boolean enabled) { + failureDetectionTimeoutEnabled = enabled; } /** - * Checks whether failure detection threshold is enabled for this {@link IgniteSpi}. + * Checks whether failure detection timeout is enabled for this {@link IgniteSpi}. * * @return {@code true} if enabled, {@code false} otherwise. */ - public boolean failureDetectionThresholdEnabled() { - return failureDetectionThresholdEnabled; + public boolean failureDetectionTimeoutEnabled() { + return failureDetectionTimeoutEnabled; } /** - * Returns failure detection threshold set to use for network related operations. + * Returns failure detection timeout set to use for network related operations. * - * @return failure detection threshold in milliseconds or {@code 0} if the threshold is disabled. + * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled. */ - public long failureDetectionThreshold() { - return failureDetectionThreshold; + public long failureDetectionTimeout() { + return failureDetectionTimeout; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java index 235fd2b..0e34cf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java @@ -23,10 +23,10 @@ import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.configuration.*; /** - * Kind of exception that is used when failure detection threshold is enabled for {@link TcpDiscoverySpi} or + * Kind of exception that is used when failure detection timeout is enabled for {@link TcpDiscoverySpi} or * {@link TcpCommunicationSpi}. * - * For more information refer to {@link IgniteConfiguration#setFailureDetectionThreshold(long)} and + * For more information refer to {@link IgniteConfiguration#setFailureDetectionTimeout(long)} and * {@link IgniteSpiOperationTimeoutHelper}. */ public class IgniteSpiOperationTimeoutException extends IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index 03858d9..f7d8daa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -23,7 +23,7 @@ import java.net.*; /** * Object that incorporates logic that determines a timeout value for the next network related operation and checks - * whether a failure detection threshold is reached or not. + * whether a failure detection timeout is reached or not. * * A new instance of the class should be created for every complex network based operations that usually consists of * request and response parts. @@ -36,10 +36,10 @@ public class IgniteSpiOperationTimeoutHelper { private long timeout; /** */ - private final boolean failureDetectionThresholdEnabled; + private final boolean failureDetectionTimeoutEnabled; /** */ - private final long failureDetectionThreshold; + private final long failureDetectionTimeout; /** * Constructor. @@ -47,27 +47,27 @@ public class IgniteSpiOperationTimeoutHelper { * @param adapter SPI adapter. */ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) { - failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); - failureDetectionThreshold = adapter.failureDetectionThreshold(); + failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled(); + failureDetectionTimeout = adapter.failureDetectionTimeout(); } /** * Returns a timeout value to use for the next network operation. * - * If failure detection threshold is enabled then the returned value is a portion of time left since the last time - * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned. + * If failure detection timeout is enabled then the returned value is a portion of time left since the last time + * this method is called. If the timeout is disabled then {@code dfltTimeout} is returned. * - * @param dfltTimeout Timeout to use if failure detection threshold is disabled. + * @param dfltTimeout Timeout to use if failure detection timeout is disabled. * @return Timeout in milliseconds. - * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses + * @throws IgniteSpiOperationTimeoutException If failure detection timeout is reached for an operation that uses * this {@code IgniteSpiOperationTimeoutController}. */ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { - if (!failureDetectionThresholdEnabled) + if (!failureDetectionTimeoutEnabled) return dfltTimeout; if (lastOperStartTs == 0) { - timeout = failureDetectionThreshold; + timeout = failureDetectionTimeout; lastOperStartTs = U.currentTimeMillis(); } else { @@ -79,21 +79,21 @@ public class IgniteSpiOperationTimeoutHelper { if (timeout <= 0) throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + - "'failureDetectionThreshold' configuration property or set SPI specific timeouts" + - " manually. Current failure detection threshold: " + failureDetectionThreshold); + "'failureDetectionTimeout' configuration property [failureDetectionTimeout=" + + failureDetectionTimeout + ']'); } return timeout; } /** - * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached. + * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached. * * @param e Exception. - * @return {@code true} if failure detection threshold is reached, {@code false} otherwise. + * @return {@code true} if failure detection timeout is reached, {@code false} otherwise. */ - public boolean checkThresholdReached(Exception e) { - if (!failureDetectionThresholdEnabled) + public boolean checkFailureTimeoutReached(Exception e) { + if (!failureDetectionTimeoutEnabled) return false; return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b55dde2..7be1dbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -75,16 +75,18 @@ import static org.apache.ignite.events.EventType.*; * you own idle connection timeout. * <h1 class="header">Failure Detection</h1> * Configuration defaults (see Configuration section below and - * {@link IgniteConfiguration#getFailureDetectionThreshold()}) for details) are chosen to make possible for + * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection * time worse. * <p> * If it's needed to tune failure detection then it's highly recommended to do this using - * {@link IgniteConfiguration#setFailureDetectionThreshold(long)}. This is the easiest and most straightforward way - * to setup failure detection basing on network conditions of a cluster. + * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the + * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()}, + * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be + * ignored. * <p> * If it's required to perform advanced settings of failure detection and - * {@link IgniteConfiguration#getFailureDetectionThreshold()} is unsuitable then various {@code TcpCommunicationSpi} + * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi} * configuration parameters may be used. * <h1 class="header">Configuration</h1> * <h2 class="header">Mandatory</h2> @@ -1004,7 +1006,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * <p> * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param connTimeout Connect timeout. */ @@ -1012,7 +1014,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public void setConnectTimeout(long connTimeout) { this.connTimeout = connTimeout; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1030,7 +1032,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * <p> * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param maxConnTimeout Maximum connect timeout. */ @@ -1038,7 +1040,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public void setMaxConnectTimeout(long maxConnTimeout) { this.maxConnTimeout = maxConnTimeout; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1052,7 +1054,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * <p> * If not provided, default value is {@link #DFLT_RECONNECT_CNT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param reconCnt Maximum number of reconnection attempts. */ @@ -1060,7 +1062,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public void setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1288,7 +1290,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { - initFailureDetectionThreshold(); + initFailureDetectionTimeout(); assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); @@ -1300,7 +1302,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); - if (!failureDetectionThresholdEnabled()) { + if (!failureDetectionTimeoutEnabled()) { assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(connTimeout >= 0, "connTimeout >= 0"); assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); @@ -1382,13 +1384,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); - if (failureDetectionThresholdEnabled()) { + if (failureDetectionTimeoutEnabled()) { log.debug(configInfo("connTimeout", connTimeout)); log.debug(configInfo("maxConnTimeout", maxConnTimeout)); log.debug(configInfo("reconCnt", reconCnt)); } else - log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold())); + log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout())); log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); log.debug(configInfo("ackSndThreshold", ackSndThreshold)); @@ -1955,7 +1957,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) throw e; // Reconnect for the second time, if connection is not established. @@ -1974,15 +1976,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); - if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || - timeoutHelper.checkThresholdReached(e))) { - log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" + - failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']'); + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { + log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + + failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); throw e; } - assert !failureDetectionThresholdEnabled(); + assert !failureDetectionTimeoutEnabled(); if (log.isDebugEnabled()) log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + @@ -2173,11 +2175,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client = null; } - if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || - timeoutHelper.checkThresholdReached(e))) { + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { - String msg = "Handshake timed out (failure detection threshold is reached) " + - "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']'; + String msg = "Handshake timed out (failure detection timeout is reached) " + + "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']'; onException(msg, e); @@ -2195,7 +2197,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter break; } - assert !failureDetectionThresholdEnabled(); + assert !failureDetectionTimeoutEnabled(); onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ']', e); @@ -2242,12 +2244,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); - boolean failureDetThrReached = timeoutHelper.checkThresholdReached(e); + boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); if (failureDetThrReached) - LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " + - "configuration property) [addr=" + addr + ", failureDetectionThreshold=" + - failureDetectionThreshold() + ']'); + LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionTimeout' " + + "configuration property) [addr=" + addr + ", failureDetectionTimeout=" + + failureDetectionTimeout() + ']'); else if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 213ea63..12b10b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -560,10 +560,10 @@ class ClientImpl extends TcpDiscoveryImpl { errs.add(e); - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) break; - if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) break; if (!openSock) { @@ -577,7 +577,7 @@ class ClientImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; @@ -891,7 +891,7 @@ class ClientImpl extends TcpDiscoveryImpl { protected SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); - socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); } @@ -991,8 +991,8 @@ class ClientImpl extends TcpDiscoveryImpl { msg = null; if (ack) { - long waitEnd = U.currentTimeMillis() + (spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getAckTimeout()); + long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getAckTimeout()); TcpDiscoveryAbstractMessage unacked; @@ -1009,8 +1009,8 @@ class ClientImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Failed to get acknowledge for message, will try to reconnect " + "[msg=" + unacked + - (spi.failureDetectionThresholdEnabled() ? - ", failureDetectionThreshold=" + spi.failureDetectionThreshold() : + (spi.failureDetectionTimeoutEnabled() ? + ", failureDetectionTimeout=" + spi.failureDetectionTimeout() : ", timeout=" + spi.getAckTimeout()) + ']'); throw new IOException("Failed to get acknowledge for message: " + unacked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 99a3ee2..b4f89ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -546,9 +546,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openedSock && reconCnt == 2) break; - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) break; - else if (!spi.failureDetectionThresholdEnabled() && reconCnt == spi.getReconnectCount()) + else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount()) break; } finally { @@ -609,7 +609,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void onDataReceived() { - if (spi.failureDetectionThresholdEnabled() && locNode != null) + if (spi.failureDetectionTimeoutEnabled() && locNode != null) locNode.lastDataReceivedTime(U.currentTimeMillis()); } @@ -979,10 +979,10 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) break; - if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) break; if (!openSock) { @@ -996,7 +996,7 @@ class ServerImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; @@ -1795,12 +1795,12 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Initializes connection check frequency. Used only when failure detection threshold is enabled. + * Initializes connection check frequency. Used only when failure detection timeout is enabled. */ private void initConnectionCheckFrequency() { - if (spi.failureDetectionThresholdEnabled()) { + if (spi.failureDetectionTimeoutEnabled()) { for (int i = 3; i > 0; i--) { - connCheckFreq = spi.failureDetectionThreshold() / i; + connCheckFreq = spi.failureDetectionTimeout() / i; if (connCheckFreq > 0) break; @@ -2112,12 +2112,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openSock) break; // Don't retry if we can not establish connection. - if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) break; - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) break; - else if (!spi.failureDetectionThresholdEnabled() && (e instanceof + else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; @@ -2262,10 +2262,10 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); - if (timeoutHelper.checkThresholdReached(e)) + if (timeoutHelper.checkFailureTimeoutReached(e)) break; - if (!spi.failureDetectionThresholdEnabled()) { + if (!spi.failureDetectionTimeoutEnabled()) { if (++reconCnt == spi.getReconnectCount()) break; else if (e instanceof SocketTimeoutException || @@ -2287,7 +2287,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg + - (!spi.failureDetectionThresholdEnabled() ? ", i=" + reconCnt : "") + ']'); + (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']'); } } } // Try to reconnect. @@ -3350,8 +3350,8 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getSocketTimeout()); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); if (log.isDebugEnabled()) log.debug("Sent verified node left message to leaving node: " + msg); @@ -4044,15 +4044,15 @@ class ServerImpl extends TcpDiscoveryImpl { * Check connection aliveness status. */ private void checkConnection() { - if (!spi.failureDetectionThresholdEnabled()) + if (!spi.failureDetectionTimeoutEnabled()) return; if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime() - >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { - log.info("Local node seems to be disconnected from topology (failure detection threshold " + - "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + - ", connCheckFreq=" + connCheckFreq + ']'); + log.info("Local node seems to be disconnected from topology (failure detection timeout " + + "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() + + ", connCheckFreq=" + connCheckFreq + ']'); failureThresholdReached = true; @@ -4297,8 +4297,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (req.client()) res.clientAck(true); - spi.writeToSocket(sock, res, spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getSocketTimeout()); + spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -4407,7 +4407,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); while (!isInterrupted()) { @@ -4702,7 +4702,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); - long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); if (state == CONNECTED) { @@ -4886,8 +4886,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getSocketTimeout()); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } else { @@ -4898,8 +4898,8 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(msg, clientNodeId, null, null); - writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getSocketTimeout()); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } finally { clearNodeAddedMessage(msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index be042eb..6130bd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -66,15 +66,17 @@ import java.util.concurrent.atomic.*; * and issues node added messages and all other nodes then receive info about new node. * <h1 class="header">Failure Detection</h1> * Configuration defaults (see Configuration section below and - * {@link IgniteConfiguration#getFailureDetectionThreshold()}) for details) are chosen to make possible for discovery + * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for discovery * SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse. * <p> * If it's needed to tune failure detection then it's highly recommended to do this using - * {@link IgniteConfiguration#setFailureDetectionThreshold(long)}. This is the easiest and most straightforward way - * to setup failure detection basing on network conditions of a cluster. + * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the + * following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()}, + * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be + * ignored. * <p> * If it's required to perform advanced settings of failure detection and - * {@link IgniteConfiguration#getFailureDetectionThreshold()} is unsuitable then various {@code TcpDiscoverySpi} + * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi} * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive * settings are recommended (which allows failure detection time ~200ms): * <ul> @@ -163,13 +165,13 @@ import java.util.concurrent.atomic.*; @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { - /** Failure detection threshold feature major version. */ + /** Failure detection timeout feature major version. */ final static byte FAILURE_DETECTION_MAJOR_VER = 1; - /** Failure detection threshold feature minor version. */ + /** Failure detection timeout feature minor version. */ final static byte FAILURE_DETECTION_MINOR_VER = 4; - /** Failure detection threshold feature maintainance version. */ + /** Failure detection timeout feature maintainance version. */ final static byte FAILURE_DETECTION_MAINT_VER = 1; /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ @@ -537,7 +539,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * If not specified, default is {@link #DFLT_RECONNECT_CNT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param reconCnt Number of retries during message sending. * @see #setAckTimeout(long) @@ -546,7 +548,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); return this; } @@ -568,7 +570,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * Affected server nodes only. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param maxAckTimeout Maximum acknowledgement timeout. */ @@ -576,7 +578,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) { this.maxAckTimeout = maxAckTimeout; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); return this; } @@ -745,7 +747,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param sockTimeout Socket connection timeout. */ @@ -753,7 +755,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setSocketTimeout(long sockTimeout) { this.sockTimeout = sockTimeout; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); return this; } @@ -766,7 +768,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}. * <p> - * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionThreshold()} is ignored. + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param ackTimeout Acknowledgement timeout. */ @@ -774,7 +776,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T public TcpDiscoverySpi setAckTimeout(long ackTimeout) { this.ackTimeout = ackTimeout; - failureDetectionThresholdEnabled(false); + failureDetectionTimeoutEnabled(false); return this; } @@ -1616,7 +1618,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** {@inheritDoc} */ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - initFailureDetectionThreshold(); + initFailureDetectionTimeout(); if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) { if (ackTimeout == 0) @@ -1639,7 +1641,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T impl = new ServerImpl(this); } - if (!failureDetectionThresholdEnabled()) { + if (!failureDetectionTimeoutEnabled()) { assertParameter(sockTimeout > 0, "sockTimeout > 0"); assertParameter(ackTimeout > 0, "ackTimeout > 0"); assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); @@ -1672,10 +1674,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T log.debug(configInfo("localPortRange", locPortRange)); log.debug(configInfo("threadPri", threadPri)); - if (!failureDetectionThresholdEnabled()) { - log.debug("Failure detection threshold is disabled and connection check frequency is ignored because " + - "at least one of the parameters from this list has been set manually: 'networkTimeout'," + - " 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'."); + if (!failureDetectionTimeoutEnabled()) { + log.debug("Failure detection timeout is ignored because at least one of the parameters from this list" + + " has been set explicitly: 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'."); log.debug(configInfo("networkTimeout", netTimeout)); log.debug(configInfo("sockTimeout", sockTimeout)); @@ -1684,7 +1685,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T log.debug(configInfo("reconnectCount", reconCnt)); } else - log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold())); + log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout())); log.debug(configInfo("ipFinder", ipFinder)); log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); @@ -1694,7 +1695,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } // Warn on odd network timeout. - if (!failureDetectionThresholdEnabled() && netTimeout < 3000) + if (netTimeout < 3000) U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); registerMBean(gridName, this, TcpDiscoverySpiMBean.class); @@ -1898,9 +1899,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T U.closeQuiet(sock); LT.warn(log, null, "Socket write has timed out (consider increasing " + - (failureDetectionThresholdEnabled() ? - "'IgniteConfiguration.failureDetectionThreshold' configuration property) [" + - "failureDetectionThreshold=" + failureDetectionThreshold() + ']' : + (failureDetectionTimeoutEnabled() ? + "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" + + "failureDetectionTimeout=" + failureDetectionTimeout() + ']' : "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']')); stats.onSocketTimeout(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 514f784..b4090d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -166,7 +166,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS @Override public boolean apply() { return recoveryDesc.messagesFutures().isEmpty(); } - }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 7000 : + }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 : 10_000); assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java index 7d10316..a6bfe00 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java @@ -39,7 +39,7 @@ public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends Gri /** {@inheritDoc} */ @Override protected long awaitForSocketWriteTimeout() { - return IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 5_000; + return IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT + 5_000; } /** @@ -47,8 +47,8 @@ public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends Gri */ public void testFailureDetectionEnabled() throws Exception { for (TcpCommunicationSpi spi: spis) { - assertTrue(spi.failureDetectionThresholdEnabled()); - assertTrue(spi.failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD); + assertTrue(spi.failureDetectionTimeoutEnabled()); + assertTrue(spi.failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java index a525107..56873d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java @@ -64,12 +64,12 @@ public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpC * @throws Exception if failed. */ public void testFailureDetectionEnabled() throws Exception { - assertTrue(spis[0].failureDetectionThresholdEnabled()); - assertTrue(spis[0].failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD); + assertTrue(spis[0].failureDetectionTimeoutEnabled()); + assertTrue(spis[0].failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT); for (int i = 1; i < SPI_COUNT; i++) { - assertFalse(spis[i].failureDetectionThresholdEnabled()); - assertEquals(0, spis[i].failureDetectionThreshold()); + assertFalse(spis[i].failureDetectionTimeoutEnabled()); + assertEquals(0, spis[i].failureDetectionTimeout()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java deleted file mode 100644 index 84a11cd..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.spi.discovery.tcp.messages.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -/** - * Client-based discovery SPI test with failure detection threshold enabled. - */ -public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDiscoverySpiSelfTest { - /** */ - private final static int FAILURE_AWAIT_TIME = 7_000; - - /** */ - private final static long FAILURE_THRESHOLD = 10_000; - - /** */ - private static long failureThreshold = FAILURE_THRESHOLD; - - /** */ - private static boolean useTestSpi; - - /** {@inheritDoc} */ - @Override protected boolean useFailureDetectionThreshold() { - return true; - } - - /** {@inheritDoc} */ - @Override protected long failureDetectionThreshold() { - return failureThreshold; - } - - /** {@inheritDoc} */ - @Override protected long awaitTime() { - return failureDetectionThreshold() + FAILURE_AWAIT_TIME; - } - - /** {@inheritDoc} */ - @Override protected TcpDiscoverySpi getDiscoverySpi() { - return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi(); - } - - /** - * @throws Exception in case of error. - */ - public void testFailureDetectionThresholdEnabled() throws Exception { - startServerNodes(1); - startClientNodes(1); - - checkNodes(1, 1); - - assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())). - failureDetectionThresholdEnabled()); - assertEquals(failureDetectionThreshold(), - ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionThreshold()); - - assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())). - failureDetectionThresholdEnabled()); - assertEquals(failureDetectionThreshold(), - ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold()); - } - - /** - * @throws Exception in case of error. - */ - public void testFailureThresholdWorkabilityAvgTimeout() throws Exception { - failureThreshold = 3000; - - try { - checkFailureThresholdWorkability(); - } - finally { - failureThreshold = FAILURE_THRESHOLD; - } - } - - /** - * @throws Exception in case of error. - */ - public void testFailureThresholdWorkabilitySmallTimeout() throws Exception { - failureThreshold = 500; - - try { - checkFailureThresholdWorkability(); - } - finally { - failureThreshold = FAILURE_THRESHOLD; - } - } - - /** - * @throws Exception in case of error. - */ - private void checkFailureThresholdWorkability() throws Exception { - useTestSpi = true; - - TestTcpDiscoverySpi firstSpi = null; - TestTcpDiscoverySpi secondSpi = null; - - try { - startServerNodes(2); - - checkNodes(2, 0); - - firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi()); - secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi()); - - assert firstSpi.err == null; - - secondSpi.readDelay = failureDetectionThreshold() + 5000; - - assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId())); - - Thread.sleep(failureDetectionThreshold()); - - assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class)); - - firstSpi.reset(); - secondSpi.reset(); - - assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId())); - - assertTrue(firstSpi.err == null); - } - finally { - useTestSpi = false; - - if (firstSpi != null) - firstSpi.reset(); - - if (secondSpi != null) - secondSpi.reset(); - } - } - - /** - * - */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private long readDelay; - - /** */ - private Exception err; - - /** {@inheritDoc} */ - @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) - throws IOException, IgniteCheckedException { - - if (readDelay < failureDetectionThreshold()) { - try { - return super.readMessage(sock, in, timeout); - } - catch (Exception e) { - err = e; - - throw e; - } - } - else { - T msg = super.readMessage(sock, in, timeout); - - if (msg instanceof TcpDiscoveryPingRequest) { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // Ignore - } - throw new SocketTimeoutException("Forced timeout"); - } - - return msg; - } - } - - /** - * Resets testing state. - */ - private void reset() { - readDelay = 0; - err = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java new file mode 100644 index 0000000..3cf44f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +/** + * Client-based discovery SPI test with failure detection timeout enabled. + */ +public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscoverySpiSelfTest { + /** */ + private final static int FAILURE_AWAIT_TIME = 7_000; + + /** */ + private final static long FAILURE_THRESHOLD = 10_000; + + /** */ + private static long failureThreshold = FAILURE_THRESHOLD; + + /** */ + private static boolean useTestSpi; + + /** {@inheritDoc} */ + @Override protected boolean useFailureDetectionTimeout() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long failureDetectionTimeout() { + return failureThreshold; + } + + /** {@inheritDoc} */ + @Override protected long awaitTime() { + return failureDetectionTimeout() + FAILURE_AWAIT_TIME; + } + + /** {@inheritDoc} */ + @Override protected TcpDiscoverySpi getDiscoverySpi() { + return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi(); + } + + /** + * @throws Exception in case of error. + */ + public void testFailureDetectionTimeoutEnabled() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())). + failureDetectionTimeoutEnabled()); + assertEquals(failureDetectionTimeout(), + ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionTimeout()); + + assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())). + failureDetectionTimeoutEnabled()); + assertEquals(failureDetectionTimeout(), + ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionTimeout()); + } + + /** + * @throws Exception in case of error. + */ + public void testFailureTimeoutWorkabilityAvgTimeout() throws Exception { + failureThreshold = 3000; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + public void testFailureTimeoutWorkabilitySmallTimeout() throws Exception { + failureThreshold = 500; + + try { + checkFailureThresholdWorkability(); + } + finally { + failureThreshold = FAILURE_THRESHOLD; + } + } + + /** + * @throws Exception in case of error. + */ + private void checkFailureThresholdWorkability() throws Exception { + useTestSpi = true; + + TestTcpDiscoverySpi firstSpi = null; + TestTcpDiscoverySpi secondSpi = null; + + try { + startServerNodes(2); + + checkNodes(2, 0); + + firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi()); + secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi()); + + assert firstSpi.err == null; + + secondSpi.readDelay = failureDetectionTimeout() + 5000; + + assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + Thread.sleep(failureDetectionTimeout()); + + assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class)); + + firstSpi.reset(); + secondSpi.reset(); + + assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + assertTrue(firstSpi.err == null); + } + finally { + useTestSpi = false; + + if (firstSpi != null) + firstSpi.reset(); + + if (secondSpi != null) + secondSpi.reset(); + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private long readDelay; + + /** */ + private Exception err; + + /** {@inheritDoc} */ + @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) + throws IOException, IgniteCheckedException { + + if (readDelay < failureDetectionTimeout()) { + try { + return super.readMessage(sock, in, timeout); + } + catch (Exception e) { + err = e; + + throw e; + } + } + else { + T msg = super.readMessage(sock, in, timeout); + + if (msg instanceof TcpDiscoveryPingRequest) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // Ignore + } + throw new SocketTimeoutException("Forced timeout"); + } + + return msg; + } + } + + /** + * Resets testing state. + */ + private void reset() { + readDelay = 0; + err = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7e1f415..69a5f13 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -154,8 +154,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { else throw new IllegalArgumentException(); - if (useFailureDetectionThreshold()) - cfg.setFailureDetectionThreshold(failureDetectionThreshold()); + if (useFailureDetectionTimeout()) + cfg.setFailureDetectionTimeout(failureDetectionTimeout()); else { if (longSockTimeouts) { disco.setAckTimeout(2000); @@ -224,20 +224,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * Checks whether to use failure detection threshold instead of setting explicit timeouts. + * Checks whether to use failure detection timeout instead of setting explicit timeouts. * * @return {@code true} if use. */ - protected boolean useFailureDetectionThreshold() { + protected boolean useFailureDetectionTimeout() { return false; } /** - * Gets failure detection threshold to use. + * Gets failure detection timeout to use. * - * @return Failure detection threshold. + * @return Failure detection timeout. */ - protected long failureDetectionThreshold() { + protected long failureDetectionTimeout() { return 0; } @@ -457,7 +457,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { Ignite srv1 = G.ignite("server-1"); Ignite client = G.ignite("client-0"); - if (!useFailureDetectionThreshold()) + if (!useFailureDetectionTimeout()) ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java deleted file mode 100644 index 63e79c3..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.discovery.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.spi.discovery.tcp.messages.*; - -import java.io.*; -import java.net.*; - -/** - * - */ -public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest { - /** */ - private static final int SPI_COUNT = 6; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected int getSpiCount() { - return SPI_COUNT; - } - - /** {@inheritDoc} */ - @Override protected DiscoverySpi getSpi(int idx) { - TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi(); - - spi.setMetricsProvider(createMetricsProvider()); - spi.setIpFinder(ipFinder); - - switch (idx) { - case 0: - case 1: - // Ignore - break; - case 2: - spi.setAckTimeout(3000); - break; - case 3: - spi.setSocketTimeout(4000); - break; - case 4: - spi.setReconnectCount(4); - break; - case 5: - spi.setMaxAckTimeout(10000); - break; - default: - assert false; - } - - return spi; - } - - /** - * @throws Exception In case of error. - */ - public void testFailureDetectionThresholdEnabled() throws Exception { - assertTrue(firstSpi().failureDetectionThresholdEnabled()); - assertTrue(secondSpi().failureDetectionThresholdEnabled()); - - assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi().failureDetectionThreshold()); - assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi().failureDetectionThreshold()); - } - - /** - * @throws Exception In case of error. - */ - public void testFailureDetectionThresholdDisabled() throws Exception { - for (int i = 2; i < spis.size(); i++) { - assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionThresholdEnabled()); - assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionThreshold()); - } - } - - /** - * @throws Exception In case of error. - */ - public void testFailureDetectionOnSocketOpen() throws Exception { - try { - ClusterNode node = secondSpi().getLocalNode(); - - firstSpi().openSocketTimeout = true; - - assertFalse(firstSpi().pingNode(node.id())); - assertTrue(firstSpi().validTimeout); - assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout")); - - firstSpi().openSocketTimeout = false; - firstSpi().openSocketTimeoutWait = true; - - assertFalse(firstSpi().pingNode(node.id())); - assertTrue(firstSpi().validTimeout); - assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait")); - } - finally { - firstSpi().resetState(); - } - } - - - /** - * @throws Exception In case of error. - */ - public void testFailureDetectionOnSocketWrite() throws Exception { - try { - ClusterNode node = secondSpi().getLocalNode(); - - firstSpi().writeToSocketTimeoutWait = true; - - assertFalse(firstSpi().pingNode(node.id())); - assertTrue(firstSpi().validTimeout); - - firstSpi().writeToSocketTimeoutWait = false; - - assertTrue(firstSpi().pingNode(node.id())); - assertTrue(firstSpi().validTimeout); - } - finally { - firstSpi().resetState(); - } - } - - /** - * @throws Exception In case of error. - */ - public void testConnectionCheckMessage() throws Exception { - TestTcpDiscoverySpi nextSpi = null; - - try { - assert firstSpi().connCheckStatusMsgCntSent == 0; - - TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); - - assertNotNull(nextNode); - - nextSpi = null; - - for (int i = 1; i < spis.size(); i++) - if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { - nextSpi = (TestTcpDiscoverySpi)spis.get(i); - break; - } - - assertNotNull(nextSpi); - - assert nextSpi.connCheckStatusMsgCntReceived == 0; - - firstSpi().countConnCheckMsg = true; - nextSpi.countConnCheckMsg = true; - - Thread.sleep(firstSpi().failureDetectionThreshold()); - - firstSpi().countConnCheckMsg = false; - nextSpi.countConnCheckMsg = false; - - int sent = firstSpi().connCheckStatusMsgCntSent; - int received = nextSpi.connCheckStatusMsgCntReceived; - - assert sent >= 3 && sent < 7 : "messages sent: " + sent; - assert received >= 3 && received < 7 : "messages received: " + received; - } - finally { - firstSpi().resetState(); - - if (nextSpi != null) - nextSpi.resetState(); - } - } - - /** - * @throws Exception In case of error. - */ - public void testConnectionCheckMessageBackwardCompatibility() throws Exception { - TestTcpDiscoverySpi nextSpi = null; - TcpDiscoveryNode nextNode = null; - - IgniteProductVersion nextNodeVer = null; - - try { - assert firstSpi().connCheckStatusMsgCntSent == 0; - - nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); - - assertNotNull(nextNode); - - nextSpi = null; - - for (int i = 1; i < spis.size(); i++) - if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { - nextSpi = (TestTcpDiscoverySpi)spis.get(i); - break; - } - - assertNotNull(nextSpi); - - assert nextSpi.connCheckStatusMsgCntReceived == 0; - - nextNodeVer = nextNode.version(); - - // Overriding the version of the next node. Connection check message must not been sent to it. - nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, - (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER, - 0l, null)); - - firstSpi().countConnCheckMsg = true; - nextSpi.countConnCheckMsg = true; - - Thread.sleep(firstSpi().failureDetectionThreshold() / 2); - - firstSpi().countConnCheckMsg = false; - nextSpi.countConnCheckMsg = false; - - int sent = firstSpi().connCheckStatusMsgCntSent; - int received = nextSpi.connCheckStatusMsgCntReceived; - - assert sent == 0 : "messages sent: " + sent; - assert received == 0 : "messages received: " + received; - } - finally { - firstSpi().resetState(); - - if (nextSpi != null) - nextSpi.resetState(); - - if (nextNode != null && nextNodeVer != null) - nextNode.version(nextNodeVer); - } - } - - /** - * Returns the first spi with failure detection threshold enabled. - * - * @return SPI. - */ - private TestTcpDiscoverySpi firstSpi() { - return (TestTcpDiscoverySpi)spis.get(0); - } - - - /** - * Returns the second spi with failure detection threshold enabled. - * - * @return SPI. - */ - private TestTcpDiscoverySpi secondSpi() { - return (TestTcpDiscoverySpi)spis.get(1); - } - - /** - * - */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private volatile boolean openSocketTimeout; - - /** */ - private volatile boolean openSocketTimeoutWait; - - /** */ - private volatile boolean writeToSocketTimeoutWait; - - /** */ - private volatile boolean countConnCheckMsg; - - /** */ - private volatile int connCheckStatusMsgCntSent; - - /** */ - private volatile int connCheckStatusMsgCntReceived; - - /** */ - private volatile boolean validTimeout = true; - - /** */ - private volatile IgniteSpiOperationTimeoutException err; - - - /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) - throws IOException, IgniteSpiOperationTimeoutException { - - if (openSocketTimeout) { - err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout"); - throw err; - } - else if (openSocketTimeoutWait) { - long timeout = timeoutHelper.nextTimeoutChunk(0); - - try { - Thread.sleep(timeout + 1000); - } - catch (InterruptedException e) { - // Ignore - } - - try { - timeoutHelper.nextTimeoutChunk(0); - } - catch (IgniteSpiOperationTimeoutException e) { - throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait")); - } - } - - Socket sock = super.openSocket(sockAddr, timeoutHelper); - - try { - Thread.sleep(1500); - } catch (InterruptedException e) { - // Ignore - } - - return sock; - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { - if (!(msg instanceof TcpDiscoveryPingRequest)) { - super.writeToSocket(sock, msg, timeout); - return; - } - - if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD) { - validTimeout = false; - - throw new IgniteCheckedException("Invalid timeout: " + timeout); - } - - if (writeToSocketTimeoutWait) { - try { - Thread.sleep(timeout); - } - catch (InterruptedException e) { - // Ignore - } - } - else - super.writeToSocket(sock, msg, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntSent++; - - super.writeToSocket(sock, msg, bout, timeout); - } - - /** {@inheritDoc} */ - protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) - throws IOException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntReceived++; - - super.writeToSocket(msg, sock, res, timeout); - } - - /** - * - */ - private void resetState() { - openSocketTimeout = false; - openSocketTimeoutWait = false; - writeToSocketTimeoutWait = false; - err = null; - validTimeout = true; - connCheckStatusMsgCntSent = 0; - connCheckStatusMsgCntReceived = 0; - countConnCheckMsg = false; - } - } -}
