ignite-752: failure detection threshold support in TcpCommunicationSpi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0949c93f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0949c93f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0949c93f Branch: refs/heads/ignite-752 Commit: 0949c93fe1d1dd435b906fb1d79263e9073c0f1b Parents: c3c0ef8 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 16 13:25:48 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jul 16 13:25:48 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 20 ++-- .../IgniteSpiOperationTimeoutController.java | 7 +- .../communication/tcp/TcpCommunicationSpi.java | 106 +++++++++++++++---- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- 4 files changed, 102 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 422ce81..500c461 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -203,15 +203,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement spiCtx = new GridDummySpiContext(locNode, true, spiCtx); } - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - if (!failureDetectionThresholdEnabled) { - failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); - - assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0"); - } - } - /** * Inject ignite instance. */ @@ -579,6 +570,17 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** * TODO: IGNITE-752 + */ + protected void initFailureDetectionThreshold() { + if (failureDetectionThresholdEnabled) { + failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); + + assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0"); + } + } + + /** + * TODO: IGNITE-752 * @param enabled */ public void failureDetectionThresholdEnabled(boolean enabled) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java index 3ae4fa4..ba95871 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java @@ -46,6 +46,9 @@ public class IgniteSpiOperationTimeoutController { public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) { failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); failureDetectionThreshold = adapter.failureDetectionThreshold(); + + assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" + + failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']'; } /** @@ -70,8 +73,8 @@ public class IgniteSpiOperationTimeoutController { lastOperStartTs = curTs; if (timeout <= 0) - throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" + - " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" + + throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + + "'failureDetectionThreshold' configuration property or set SPI specific timeouts" + " manually. Current failure detection threshold: " + failureDetectionThreshold); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4ca2995..b2bc9eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -970,6 +970,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @IgniteSpiConfiguration(optional = true) public void setConnectTimeout(long connTimeout) { this.connTimeout = connTimeout; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -992,6 +994,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @IgniteSpiConfiguration(optional = true) public void setMaxConnectTimeout(long maxConnTimeout) { this.maxConnTimeout = maxConnTimeout; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -1010,6 +1014,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @IgniteSpiConfiguration(optional = true) public void setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + + failureDetectionThresholdEnabled(false); } /** {@inheritDoc} */ @@ -1239,6 +1245,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { nodeIdMsg = new NodeIdMessage(getLocalNodeId()); + initFailureDetectionThreshold(); + assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); @@ -1247,10 +1255,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); - assertParameter(connTimeout >= 0, "connTimeout >= 0"); - assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + + if (!failureDetectionThresholdEnabled()) { + assertParameter(reconCnt > 0, "reconnectCnt > 0"); + assertParameter(connTimeout >= 0, "connTimeout >= 0"); + assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + } + assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0"); assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0"); assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0"); @@ -1260,7 +1272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'."); assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5, - "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); + "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); } try { @@ -1326,9 +1338,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); - log.debug(configInfo("connTimeout", connTimeout)); - log.debug(configInfo("maxConnTimeout", maxConnTimeout)); - log.debug(configInfo("reconCnt", reconCnt)); + + if (failureDetectionThresholdEnabled()) { + log.debug(configInfo("connTimeout", connTimeout)); + log.debug(configInfo("maxConnTimeout", maxConnTimeout)); + log.debug(configInfo("reconCnt", reconCnt)); + } + else + log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold())); + log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); log.debug(configInfo("ackSndThreshold", ackSndThreshold)); log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); @@ -1850,17 +1868,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long connTimeout0 = connTimeout; + IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this); + while (true) { GridCommunicationClient client; try { client = new GridShmemCommunicationClient(metricsLsnr, port, - connTimeout, + timeoutCtrl.nextTimeoutChunk(connTimeout), log, getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { + if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + throw e; + // Reconnect for the second time, if connection is not established. if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { connectAttempts++; @@ -1872,15 +1895,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), connTimeout0); + safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0)); } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { + client.forceClose(); + + if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || + timeoutCtrl.checkFailureDetectionThresholdReached(e))) { + log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" + + failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']'); + + throw e; + } + + assert !failureDetectionThresholdEnabled(); + if (log.isDebugEnabled()) - log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']'); - client.forceClose(); - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { if (log.isDebugEnabled()) log.debug("Handshake timedout (will stop attempts to perform the handshake) " + @@ -1889,8 +1922,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", err=" + e.getMessage() + ", client=" + client + ']'); throw e; - } - else { + } else { attempt++; connTimeout0 *= 2; @@ -1994,6 +2026,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int attempt = 1; + IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this); + while (!conn) { // Reconnection on handshake timeout. try { SocketChannel ch = SocketChannel.open(); @@ -2020,9 +2054,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = -1; try { - ch.socket().connect(addr, (int)connTimeout); + ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout)); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0); + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0)); if (rcvCnt == -1) return null; @@ -2056,14 +2090,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { if (client != null) { client.forceClose(); client = null; } - onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || + timeoutCtrl.checkFailureDetectionThresholdReached(e))) { + + String msg = "Handshake timed out (failure detection threshold is reached) " + + "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']'; + + onException(msg, e); + + if (log.isDebugEnabled()) + log.debug(msg); + + if (errs == null) + errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + + errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + + break; + } + + assert !failureDetectionThresholdEnabled(); + + onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ']', e); if (log.isDebugEnabled()) @@ -2108,7 +2166,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); - if (X.hasCause(e, SocketTimeoutException.class)) + boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e); + + if (failureDetThrReached) + LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " + + "configuration property) [addr=" + addr + ", failureDetectionThreshold=" + + failureDetectionThreshold() + ']'); + else if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); @@ -2121,7 +2185,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2 && + if (!failureDetThrReached && connectAttempts < 2 && (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { connectAttempts++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 5ed946b..5be7ab9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1560,7 +1560,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** {@inheritDoc} */ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - super.spiStart(gridName); + initFailureDetectionThreshold(); if (!failureDetectionThresholdEnabled()) { if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {