master: back merge from ignite-752
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cff25e91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cff25e91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cff25e91 Branch: refs/heads/ignite-gg-9615 Commit: cff25e91ac16fb11f3790690ec28d39a729519d9 Parents: ae148f1 Author: dmagda <magda7...@gmail.com> Authored: Fri Jul 24 15:32:51 2015 +0300 Committer: dmagda <magda7...@gmail.com> Committed: Fri Jul 24 15:32:51 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 35 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 58 +++ .../spi/IgniteSpiOperationTimeoutException.java | 43 ++ .../spi/IgniteSpiOperationTimeoutHelper.java | 102 ++++ .../communication/tcp/TcpCommunicationSpi.java | 122 ++++- .../ignite/spi/discovery/tcp/ClientImpl.java | 52 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 509 +++++++++++-------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 11 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 135 +++-- .../tcp/internal/TcpDiscoveryNode.java | 21 + .../TcpDiscoveryConnectionCheckMessage.java | 64 +++ .../IgniteClientReconnectAbstractTest.java | 4 +- .../GridTcpCommunicationSpiAbstractTest.java | 2 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 3 +- ...tionSpiRecoveryFailureDetectionSelfTest.java | 54 ++ ...GridTcpCommunicationSpiRecoverySelfTest.java | 23 +- ...unicationSpiTcpFailureDetectionSelfTest.java | 75 +++ .../discovery/AbstractDiscoverySelfTest.java | 23 +- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 205 ++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 116 +++-- .../tcp/TcpDiscoverySpiConfigSelfTest.java | 1 + .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 402 +++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 3 + .../IgniteSpiDiscoverySelfTestSuite.java | 2 + 24 files changed, 1749 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 823ddcd..aac1754 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -190,6 +190,11 @@ public class IgniteConfiguration { /** Default value for cache sanity check enabled flag. */ public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; + /** Default failure detection timeout in millis. */ + @SuppressWarnings("UnnecessaryBoxing") +// public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); + public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); + /** Optional grid name. */ private String gridName; @@ -367,6 +372,9 @@ public class IgniteConfiguration { /** Port number range for time server. */ private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE; + /** Failure detection timeout. */ + private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT; + /** Property names to include into node attributes. */ private String[] includeProps; @@ -449,7 +457,7 @@ public class IgniteConfiguration { consistentId = cfg.getConsistentId(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); - pubPoolSize = cfg.getPublicThreadPoolSize(); + failureDetectionTimeout = cfg.getFailureDetectionTimeout(); ggHome = cfg.getIgniteHome(); ggWork = cfg.getWorkDirectory(); gridName = cfg.getGridName(); @@ -479,6 +487,7 @@ public class IgniteConfiguration { p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize(); p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize(); pluginCfgs = cfg.getPluginConfigurations(); + pubPoolSize = cfg.getPublicThreadPoolSize(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); segResolveAttempts = cfg.getSegmentationResolveAttempts(); @@ -1655,6 +1664,30 @@ public class IgniteConfiguration { } /** + * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + * <p> + * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}. + * + * @see #setFailureDetectionTimeout(long) + * @return Failure detection timeout in milliseconds. + */ + public Long getFailureDetectionTimeout() { + return failureDetectionTimeout; + } + + /** + * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + * <p> + * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before + * considering a remote connection failed. + * + * @param failureDetectionTimeout Failure detection timeout in milliseconds. + */ + public void setFailureDetectionTimeout(long failureDetectionTimeout) { + this.failureDetectionTimeout = failureDetectionTimeout; + } + + /** * Should return fully configured load balancing SPI implementation. If not provided, * {@link RoundRobinLoadBalancingSpi} will be used. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 2f3def9..f809d82 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; @@ -74,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Local node. */ private ClusterNode locNode; + /** Failure detection timeout usage switch. */ + private boolean failureDetectionTimeoutEnabled = true; + + /** + * Failure detection timeout. Initialized with the value of + * {@link IgniteConfiguration#getFailureDetectionTimeout()}. + */ + private long failureDetectionTimeout; + /** * Creates new adapter and initializes it from the current (this) class. * SPI name will be initialized to the simple name of the class @@ -583,6 +593,54 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** + * Initiates and checks failure detection timeout value. + */ + protected void initFailureDetectionTimeout() { + if (failureDetectionTimeoutEnabled) { + failureDetectionTimeout = ignite.configuration().getFailureDetectionTimeout(); + + if (failureDetectionTimeout <= 0) + throw new IgniteSpiException("Invalid failure detection timeout value: " + failureDetectionTimeout); + else if (failureDetectionTimeout <= 10) + // Because U.currentTimeInMillis() is updated once in 10 milliseconds. + log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " + + "[failureDetectionTimeout=" + failureDetectionTimeout + ']'); + } + // Intentionally compare references using '!=' below + else if (ignite.configuration().getFailureDetectionTimeout() != + IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) + log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)"); + + } + + /** + * Enables or disables failure detection timeout. + * + * @param enabled {@code true} if enable, {@code false} otherwise. + */ + public void failureDetectionTimeoutEnabled(boolean enabled) { + failureDetectionTimeoutEnabled = enabled; + } + + /** + * Checks whether failure detection timeout is enabled for this {@link IgniteSpi}. + * + * @return {@code true} if enabled, {@code false} otherwise. + */ + public boolean failureDetectionTimeoutEnabled() { + return failureDetectionTimeoutEnabled; + } + + /** + * Returns failure detection timeout set to use for network related operations. + * + * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled. + */ + public long failureDetectionTimeout() { + return failureDetectionTimeout; + } + + /** * Temporarily SPI context. */ private class GridDummySpiContext implements IgniteSpiContext { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java new file mode 100644 index 0000000..0e34cf2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import org.apache.ignite.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.configuration.*; + +/** + * Kind of exception that is used when failure detection timeout is enabled for {@link TcpDiscoverySpi} or + * {@link TcpCommunicationSpi}. + * + * For more information refer to {@link IgniteConfiguration#setFailureDetectionTimeout(long)} and + * {@link IgniteSpiOperationTimeoutHelper}. + */ +public class IgniteSpiOperationTimeoutException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * @param msg Error message. + */ + public IgniteSpiOperationTimeoutException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java new file mode 100644 index 0000000..f7d8daa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spi; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; + +/** + * Object that incorporates logic that determines a timeout value for the next network related operation and checks + * whether a failure detection timeout is reached or not. + * + * A new instance of the class should be created for every complex network based operations that usually consists of + * request and response parts. + */ +public class IgniteSpiOperationTimeoutHelper { + /** */ + private long lastOperStartTs; + + /** */ + private long timeout; + + /** */ + private final boolean failureDetectionTimeoutEnabled; + + /** */ + private final long failureDetectionTimeout; + + /** + * Constructor. + * + * @param adapter SPI adapter. + */ + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) { + failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled(); + failureDetectionTimeout = adapter.failureDetectionTimeout(); + } + + /** + * Returns a timeout value to use for the next network operation. + * + * If failure detection timeout is enabled then the returned value is a portion of time left since the last time + * this method is called. If the timeout is disabled then {@code dfltTimeout} is returned. + * + * @param dfltTimeout Timeout to use if failure detection timeout is disabled. + * @return Timeout in milliseconds. + * @throws IgniteSpiOperationTimeoutException If failure detection timeout is reached for an operation that uses + * this {@code IgniteSpiOperationTimeoutController}. + */ + public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { + if (!failureDetectionTimeoutEnabled) + return dfltTimeout; + + if (lastOperStartTs == 0) { + timeout = failureDetectionTimeout; + lastOperStartTs = U.currentTimeMillis(); + } + else { + long curTs = U.currentTimeMillis(); + + timeout = timeout - (curTs - lastOperStartTs); + + lastOperStartTs = curTs; + + if (timeout <= 0) + throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + + "'failureDetectionTimeout' configuration property [failureDetectionTimeout=" + + failureDetectionTimeout + ']'); + } + + return timeout; + } + + /** + * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached. + * + * @param e Exception. + * @return {@code true} if failure detection timeout is reached, {@code false} otherwise. + */ + public boolean checkFailureTimeoutReached(Exception e) { + if (!failureDetectionTimeoutEnabled) + return false; + + return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || + X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e9fd696..7be1dbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -73,7 +73,21 @@ import static org.apache.ignite.events.EventType.*; * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure * you own idle connection timeout. + * <h1 class="header">Failure Detection</h1> + * Configuration defaults (see Configuration section below and + * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for + * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection + * time worse. * <p> + * If it's needed to tune failure detection then it's highly recommended to do this using + * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the + * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()}, + * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be + * ignored. + * <p> + * If it's required to perform advanced settings of failure detection and + * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi} + * configuration parameters may be used. * <h1 class="header">Configuration</h1> * <h2 class="header">Mandatory</h2> * This SPI has no mandatory configuration parameters. @@ -991,12 +1005,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * {@code 0} is interpreted as infinite timeout. * <p> * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param connTimeout Connect timeout. */ @IgniteSpiConfiguration(optional = true) public void setConnectTimeout(long connTimeout) { this.connTimeout = connTimeout; + + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1013,12 +1031,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * {@code 0} is interpreted as infinite timeout. * <p> * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param maxConnTimeout Maximum connect timeout. */ @IgniteSpiConfiguration(optional = true) public void setMaxConnectTimeout(long maxConnTimeout) { this.maxConnTimeout = maxConnTimeout; + + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1031,12 +1053,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * with remote nodes. * <p> * If not provided, default value is {@link #DFLT_RECONNECT_CNT}. + * <p> + * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param reconCnt Maximum number of reconnection attempts. */ @IgniteSpiConfiguration(optional = true) public void setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + + failureDetectionTimeoutEnabled(false); } /** {@inheritDoc} */ @@ -1264,6 +1290,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { + initFailureDetectionTimeout(); + assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); @@ -1272,10 +1300,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); - assertParameter(connTimeout >= 0, "connTimeout >= 0"); - assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + + if (!failureDetectionTimeoutEnabled()) { + assertParameter(reconCnt > 0, "reconnectCnt > 0"); + assertParameter(connTimeout >= 0, "connTimeout >= 0"); + assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); + } + assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0"); assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0"); assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0"); @@ -1351,9 +1383,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); - log.debug(configInfo("connTimeout", connTimeout)); - log.debug(configInfo("maxConnTimeout", maxConnTimeout)); - log.debug(configInfo("reconCnt", reconCnt)); + + if (failureDetectionTimeoutEnabled()) { + log.debug(configInfo("connTimeout", connTimeout)); + log.debug(configInfo("maxConnTimeout", maxConnTimeout)); + log.debug(configInfo("reconCnt", reconCnt)); + } + else + log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout())); + log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); log.debug(configInfo("ackSndThreshold", ackSndThreshold)); log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); @@ -1906,17 +1944,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long connTimeout0 = connTimeout; + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + while (true) { GridCommunicationClient client; try { client = new GridShmemCommunicationClient(metricsLsnr, port, - connTimeout, + timeoutHelper.nextTimeoutChunk(connTimeout), log, getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { + if (timeoutHelper.checkFailureTimeoutReached(e)) + throw e; + // Reconnect for the second time, if connection is not established. if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { connectAttempts++; @@ -1928,15 +1971,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), connTimeout0); + safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { + client.forceClose(); + + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { + log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + + failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); + + throw e; + } + + assert !failureDetectionTimeoutEnabled(); + if (log.isDebugEnabled()) - log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']'); - client.forceClose(); - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { if (log.isDebugEnabled()) log.debug("Handshake timedout (will stop attempts to perform the handshake) " + @@ -2050,6 +2103,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int attempt = 1; + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + while (!conn) { // Reconnection on handshake timeout. try { SocketChannel ch = SocketChannel.open(); @@ -2076,9 +2131,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = -1; try { - ch.socket().connect(addr, (int)connTimeout); + ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout)); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0); + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), + timeoutHelper.nextTimeoutChunk(connTimeout0)); if (rcvCnt == -1) return null; @@ -2112,19 +2168,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } } - catch (HandshakeTimeoutException e) { + catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { if (client != null) { client.forceClose(); client = null; } + if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || + timeoutHelper.checkFailureTimeoutReached(e))) { + + String msg = "Handshake timed out (failure detection timeout is reached) " + + "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']'; + + onException(msg, e); + + if (log.isDebugEnabled()) + log.debug(msg); + + if (errs == null) + errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + + "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " + + "in order to prevent parties from waiting forever in case of network issues " + + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); + + errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + + break; + } + + assert !failureDetectionTimeoutEnabled(); + onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ']', e); if (log.isDebugEnabled()) log.debug( - "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", addr=" + addr + ", err=" + e + ']'); if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { @@ -2164,7 +2244,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); - if (X.hasCause(e, SocketTimeoutException.class)) + boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e); + + if (failureDetThrReached) + LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionTimeout' " + + "configuration property) [addr=" + addr + ", failureDetectionTimeout=" + + failureDetectionTimeout() + ']'); + else if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); @@ -2177,7 +2263,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2 && + if (!failureDetThrReached && connectAttempts < 2 && (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { connectAttempts++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 572ba2c..12b10b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -480,13 +480,17 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<Throwable> errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); + + int reconCnt = 0; int connectAttempts = 1; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + while (true) { boolean openSock = false; Socket sock = null; @@ -494,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; @@ -502,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl { req.client(true); - spi.writeToSocket(sock, req); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); @@ -532,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl { msg.client(true); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -540,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck()); + return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), + res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -555,6 +560,12 @@ class ClientImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutHelper.checkFailureTimeoutReached(e)) + break; + + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -566,7 +577,8 @@ class ClientImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -868,6 +880,9 @@ class ClientImpl extends TcpDiscoveryImpl { private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); /** */ + private final long socketTimeout; + + /** */ private TcpDiscoveryAbstractMessage unackedMsg; /** @@ -875,6 +890,9 @@ class ClientImpl extends TcpDiscoveryImpl { */ protected SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); + + socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout(); } /** @@ -968,12 +986,13 @@ class ClientImpl extends TcpDiscoveryImpl { } } - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, socketTimeout); msg = null; if (ack) { - long waitEnd = U.currentTimeMillis() + spi.ackTimeout; + long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getAckTimeout()); TcpDiscoveryAbstractMessage unacked; @@ -989,7 +1008,10 @@ class ClientImpl extends TcpDiscoveryImpl { if (unacked != null) { if (log.isDebugEnabled()) log.debug("Failed to get acknowledge for message, will try to reconnect " + - "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']'); + "[msg=" + unacked + + (spi.failureDetectionTimeoutEnabled() ? + ", failureDetectionTimeout=" + spi.failureDetectionTimeout() : + ", timeout=" + spi.getAckTimeout()) + ']'); throw new IOException("Failed to get acknowledge for message: " + unacked); } @@ -1068,11 +1090,11 @@ class ClientImpl extends TcpDiscoveryImpl { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); } else - U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + + " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index dc343eb..b4f89ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** Client message workers. */ protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; - - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; - /** IP finder cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IpFinderCleaner ipFinderCleaner; @@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onJoinFinished(); - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); - - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); - if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); ipFinderCleaner.start(); @@ -278,10 +264,10 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; + while (spiState != LEFT && timeout > 0) { try { mux.wait(timeout); @@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tmp); U.joinThreads(tmp, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -482,6 +462,8 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + if (F.contains(spi.locNodeAddrs, addr)) { if (clientNodeId == null) return F.t(getLocalNodeId(), false); @@ -494,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean clientPingRes; try { - clientPingRes = clientWorker.ping(); + clientPingRes = clientWorker.ping(timeoutHelper); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -517,18 +499,26 @@ class ServerImpl extends TcpDiscoveryImpl { try { Socket sock = null; - for (int i = 0; i < spi.reconCnt; i++) { + int reconCnt = 0; + + boolean openedSock = false; + + while (true) { try { if (addr.isUnresolved()) addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); - spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + openedSock = true; - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( + spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -550,6 +540,16 @@ class ServerImpl extends TcpDiscoveryImpl { errs = new ArrayList<>(); errs.add(e); + + reconCnt++; + + if (!openedSock && reconCnt == 2) + break; + + if (timeoutHelper.checkFailureTimeoutReached(e)) + break; + else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount()) + break; } finally { U.closeQuiet(sock); @@ -607,6 +607,12 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** {@inheritDoc} */ + @Override protected void onDataReceived() { + if (spi.failureDetectionTimeoutEnabled() && locNode != null) + locNode.lastDataReceivedTime(U.currentTimeMillis()); + } + /** * Tries to join this node to topology. * @@ -678,10 +684,10 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { - long threshold = U.currentTimeMillis() + spi.netTimeout; - long timeout = spi.netTimeout; + long threshold = U.currentTimeMillis() + timeout; + while (spiState == CONNECTING && timeout > 0) { try { mux.wait(timeout); @@ -883,15 +889,19 @@ class ServerImpl extends TcpDiscoveryImpl { Collection<Throwable> errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); int connectAttempts = 1; - boolean joinReqSent = false; + boolean joinReqSent; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + int reconCnt = 0; + + while (true){ // Need to set to false on each new iteration, // since remote node may leave in the middle of the first iteration. joinReqSent = false; @@ -903,14 +913,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( + spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( + ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -924,7 +936,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send message. tstamp = U.currentTimeMillis(); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -941,7 +953,7 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, ackTimeout0); + return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -967,6 +979,12 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutHelper.checkFailureTimeoutReached(e)) + break; + + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -978,7 +996,8 @@ class ServerImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -1256,12 +1275,6 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(tcpSrvr); U.join(tcpSrvr, log); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -1350,8 +1363,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Internal threads: ").append(U.nl()); b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); + b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); @@ -1398,7 +1410,8 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean recordable(TcpDiscoveryAbstractMessage msg) { return !(msg instanceof TcpDiscoveryHeartbeatMessage) && !(msg instanceof TcpDiscoveryStatusCheckMessage) && - !(msg instanceof TcpDiscoveryDiscardMessage); + !(msg instanceof TcpDiscoveryDiscardMessage) && + !(msg instanceof TcpDiscoveryConnectionCheckMessage); } /** @@ -1434,112 +1447,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Thread that sends heartbeats. - */ - private class HeartbeatsSender extends IgniteSpiThread { - /** - * Constructor. - */ - private HeartbeatsSender() { - super(spi.ignite().name(), "tcp-disco-hb-sender", log); - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - while (!isLocalNodeCoordinator()) - Thread.sleep(1000); - - if (log.isDebugEnabled()) - log.debug("Heartbeats sender has been started."); - - UUID nodeId = getConfiguredNodeId(); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); - - return; - } - - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId); - - msg.verify(getLocalNodeId()); - - msgWorker.addMessage(msg); - - Thread.sleep(spi.hbFreq); - } - } - } - - /** - * Thread that sends status check messages to next node if local node has not - * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage}) - * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * - * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. - */ - private class CheckStatusSender extends IgniteSpiThread { - /** - * Constructor. - */ - private CheckStatusSender() { - super(spi.ignite().name(), "tcp-disco-status-check-sender", log); - - setPriority(spi.threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Status check sender has been started."); - - // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm. - long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50; - - long lastSent = 0; - - while (!isInterrupted()) { - // 1. Determine timeout. - if (lastSent < locNode.lastUpdateTime()) - lastSent = locNode.lastUpdateTime(); - - long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); - - if (timeout > 0) - Thread.sleep(timeout); - - // 2. Check if SPI is still connected. - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping status check sender (SPI is not connected to topology)."); - - return; - } - - // 3. Was there an update? - if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { - if (log.isDebugEnabled()) - log.debug("Skipping status check send " + - "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + - ", hasRmts=" + ring.hasRemoteNodes() + ']'); - - continue; - } - - // 4. Send status check message. - lastSent = U.currentTimeMillis(); - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); - } - } - } - - /** * Thread that cleans IP finder and keeps it in the correct state, unregistering * addresses of the nodes that has left the topology. * <p> @@ -1861,10 +1768,49 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** Last time status message has been sent. */ + private long lastTimeStatusMsgSent; + + /** Incoming heartbeats check frequency. */ + private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50; + + /** Last time heartbeat message has been sent. */ + private long lastTimeHbMsgSent; + + /** Time when the last status message has been sent. */ + private long lastTimeConnCheckMsgSent; + + /** Flag that keeps info on whether the threshold is reached or not. */ + private boolean failureThresholdReached; + + /** Connection check frequency. */ + private long connCheckFreq; + /** */ protected RingMessageWorker() { - super("tcp-disco-msg-worker"); + super("tcp-disco-msg-worker", 10); + + initConnectionCheckFrequency(); + } + + /** + * Initializes connection check frequency. Used only when failure detection timeout is enabled. + */ + private void initConnectionCheckFrequency() { + if (spi.failureDetectionTimeoutEnabled()) { + for (int i = 3; i > 0; i--) { + connCheckFreq = spi.failureDetectionTimeout() / i; + + if (connCheckFreq > 0) + break; + } + + assert connCheckFreq > 0; + + if (log.isDebugEnabled()) + log.debug("Connection check frequency is calculated: " + connCheckFreq); + } } /** @@ -1921,9 +1867,25 @@ class ServerImpl extends TcpDiscoveryImpl { if (spi.ensured(msg)) msgHist.add(msg); + if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) + // Reset the flag. + failureThresholdReached = false; + spi.stats.onMessageProcessingFinished(msg); } + /** {@inheritDoc} */ + @Override protected void noMessageLoop() { + if (locNode == null) + return; + + checkConnection(); + + sendHeartbeatMessage(); + + checkHeartbeatsReceiving(); + } + /** * Sends message across the ring. * @@ -1990,7 +1952,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("No next node in topology."); - if (ring.hasRemoteNodes()) { + if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) { msg.senderNodeId(locNodeId); addMessage(msg); @@ -2027,7 +1990,7 @@ class ServerImpl extends TcpDiscoveryImpl { List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses()); addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) { - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); if (locNodeAddrs.contains(addr)){ if (log.isDebugEnabled()) @@ -2037,8 +2000,15 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - for (int i = 0; i < spi.reconCnt; i++) { + int reconCnt = 0; + + IgniteSpiOperationTimeoutHelper timeoutHelper = null; + + while (true) { if (sock == null) { + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + nextNodeExists = false; boolean success = false; @@ -2049,14 +2019,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, + timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2140,8 +2112,13 @@ class ServerImpl extends TcpDiscoveryImpl { if (!openSock) break; // Don't retry if we can not establish connection. - if (e instanceof SocketTimeoutException || - X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + + if (timeoutHelper.checkFailureTimeoutReached(e)) + break; + else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof + SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -2156,9 +2133,13 @@ class ServerImpl extends TcpDiscoveryImpl { sock = null; } - else + else { // Next node exists and accepts incoming messages. nextNodeExists = true; + // Resetting timeout control object to let the code below to use a new one + // for the next bunch of operations. + timeoutHelper = null; + } } } @@ -2195,8 +2176,12 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + try { - writeToSocket(sock, pendingMsg); + writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.getSocketTimeout())); } finally { clearNodeAddedMessage(pendingMsg); @@ -2204,7 +2189,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msg=" + msg.id() + @@ -2215,19 +2200,34 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog("Pending message has been sent to next node [msg=" + msg.id() + ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ", res=" + res + ']'); + + // Resetting timeout control object to create a new one for the next bunch of + // operations. + timeoutHelper = null; } } - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) + // Preserve backward compatibility with nodes of older versions. + msg = new TcpDiscoveryStatusCheckMessage(locNode, null); + } + else + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); try { long tstamp = U.currentTimeMillis(); - writeToSocket(sock, msg); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, ackTimeout0); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + @@ -2262,11 +2262,19 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { - ackTimeout0 *= 2; + if (timeoutHelper.checkFailureTimeoutReached(e)) + break; - if (!checkAckTimeout(ackTimeout0)) + if (!spi.failureDetectionTimeoutEnabled()) { + if (++reconCnt == spi.getReconnectCount()) break; + else if (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } } } finally { @@ -2279,7 +2287,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg + - ", i=" + i + ']'); + (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']'); } } } // Try to reconnect. @@ -3342,7 +3350,8 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); if (log.isDebugEnabled()) log.debug("Sent verified node left message to leaving node: " + msg); @@ -3991,6 +4000,77 @@ class ServerImpl extends TcpDiscoveryImpl { } } } + + /** + * Sends heartbeat message if needed. + */ + private void sendHeartbeatMessage() { + if (!isLocalNodeCoordinator()) + return; + + long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); + + msg.verify(getLocalNodeId()); + + msgWorker.addMessage(msg); + + lastTimeHbMsgSent = U.currentTimeMillis(); + } + + /** + * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than + * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring. + */ + private void checkHeartbeatsReceiving() { + if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) + lastTimeStatusMsgSent = locNode.lastUpdateTime(); + + long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + + lastTimeStatusMsgSent = U.currentTimeMillis(); + } + + /** + * Check connection aliveness status. + */ + private void checkConnection() { + if (!spi.failureDetectionTimeoutEnabled()) + return; + + if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime() + >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + + log.info("Local node seems to be disconnected from topology (failure detection timeout " + + "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() + + ", connCheckFreq=" + connCheckFreq + ']'); + + failureThresholdReached = true; + + // Reset sent time deliberately to force sending connection check message. + lastTimeConnCheckMsgSent = 0; + } + + long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis(); + + if (elapsed > 0) + return; + + if (ring.hasRemoteNodes()) { + sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); + + lastTimeConnCheckMsgSent = U.currentTimeMillis(); + } + } } /** @@ -4186,14 +4266,17 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); + IgniteSpiOperationTimeoutHelper timeoutHelper = + new IgniteSpiOperationTimeoutHelper(spi); + if (req.clientNodeId() != null) { ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); if (clientWorker != null) - res.clientExists(clientWorker.ping()); + res.clientExists(clientWorker.ping(timeoutHelper)); } - spi.writeToSocket(sock, res); + spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); } else if (log.isDebugEnabled()) log.debug("Ignore ping request, node is stopping."); @@ -4214,7 +4297,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (req.client()) res.clientAck(true); - spi.writeToSocket(sock, res); + spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -4323,6 +4407,9 @@ class ServerImpl extends TcpDiscoveryImpl { return; } + long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout(); + while (!isInterrupted()) { try { TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); @@ -4337,7 +4424,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode && recordable(msg)) debugLog("Message has been received: " + msg); - if (msg instanceof TcpDiscoveryJoinRequestMessage) { + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + + continue; + } + else if (msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg; if (!req.responded()) { @@ -4355,7 +4447,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); if (clientMsgWrk.getState() == State.NEW) clientMsgWrk.start(); @@ -4365,7 +4457,7 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } else { - spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN); + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout); break; } @@ -4373,7 +4465,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4402,7 +4494,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4431,7 +4523,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4460,7 +4552,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); boolean ignored = false; @@ -4509,7 +4601,7 @@ class ServerImpl extends TcpDiscoveryImpl { clientMsgWrk.addMessage(ack); } else - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -4610,8 +4702,11 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); + long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout(); + if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK, socketTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); @@ -4648,7 +4743,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - spi.writeToSocket(msg, sock, res); + spi.writeToSocket(msg, sock, res, socketTimeout); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); @@ -4741,7 +4836,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param clientNodeId Node ID. */ protected ClientMessageWorker(Socket sock, UUID clientNodeId) { - super("tcp-disco-client-message-worker"); + super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; @@ -4791,7 +4886,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } else { @@ -4802,7 +4898,8 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(msg, clientNodeId, null, null); - writeToSocket(sock, msg); + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } finally { clearNodeAddedMessage(msg); @@ -4836,10 +4933,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param timeoutHelper Timeout controller. * @return Ping result. * @throws InterruptedException If interrupted. */ - public boolean ping() throws InterruptedException { + public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException { if (spi.isNodeStopping0()) return false; @@ -4865,7 +4963,8 @@ class ServerImpl extends TcpDiscoveryImpl { } try { - return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS); + return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()), + TimeUnit.MILLISECONDS); } catch (IgniteInterruptedCheckedException ignored) { throw new InterruptedException(); @@ -4904,12 +5003,18 @@ class ServerImpl extends TcpDiscoveryImpl { /** Backed interrupted flag. */ private volatile boolean interrupted; + /** Polling timeout. */ + private final long pollingTimeout; + /** * @param name Thread name. + * @param pollingTimeout Messages polling timeout. */ - protected MessageWorkerAdapter(String name) { + protected MessageWorkerAdapter(String name, long pollingTimeout) { super(spi.ignite().name(), name, log); + this.pollingTimeout = pollingTimeout; + setPriority(spi.threadPri); } @@ -4919,12 +5024,12 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); if (msg == null) - continue; - - processMessage(msg); + noMessageLoop(); + else + processMessage(msg); } } @@ -4968,16 +5073,24 @@ class ServerImpl extends TcpDiscoveryImpl { protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); /** + * Called when there is no message to process giving ability to perform other activity. + */ + protected void noMessageLoop() { + // No-op. + } + + /** * @param sock Socket. * @param msg Message. + * @param timeout Socket timeout. * @throws IOException If IO failed. * @throws IgniteCheckedException If marshalling failed. */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { bout.reset(); - spi.writeToSocket(sock, msg, bout); + spi.writeToSocket(sock, msg, bout, timeout); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index c271b7c..4dacf45 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -131,6 +131,13 @@ abstract class TcpDiscoveryImpl { } /** + * Called when a chunk of data is received from a remote node. + */ + protected void onDataReceived() { + // No-op + } + + /** * @param log Logger. */ public abstract void dumpDebugInfo(IgniteLogger log); @@ -273,10 +280,10 @@ abstract class TcpDiscoveryImpl { * maximum acknowledgement timeout, {@code false} otherwise. */ protected boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > spi.maxAckTimeout) { + if (ackTimeout > spi.getMaxAckTimeout()) { LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']'); return false; }