ignite-752: reimplemented pingNode with failure detection threshold
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/567aec10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/567aec10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/567aec10 Branch: refs/heads/ignite-752 Commit: 567aec1029a6302016e3fe189f5a973014b54d94 Parents: 1a7421e Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 15 12:29:49 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 15 12:29:49 2015 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 25 ++++- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 103 ++++++++++++++++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 30 +++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 108 +++++++++++-------- 4 files changed, 217 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 2d36c7a..b3d2bfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -189,6 +189,9 @@ public class IgniteConfiguration { /** Default value for cache sanity check enabled flag. */ public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true; + /** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */ + public static final int DFLT_FAILURE_DETECTION_THRESHOLD = 10_000; + /** Optional grid name. */ private String gridName; @@ -366,6 +369,9 @@ public class IgniteConfiguration { /** Port number range for time server. */ private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE; + /** Failure detection threshold used by DiscoverySpi and CommunicationSpi. */ + private int failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD; + /** Property names to include into node attributes. */ private String[] includeProps; @@ -444,7 +450,7 @@ public class IgniteConfiguration { clockSyncSamples = cfg.getClockSyncSamples(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); - pubPoolSize = cfg.getPublicThreadPoolSize(); + failureDetectionThreshold = cfg.getFailureDetectionThreshold(); ggHome = cfg.getIgniteHome(); ggWork = cfg.getWorkDirectory(); gridName = cfg.getGridName(); @@ -474,6 +480,7 @@ public class IgniteConfiguration { p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize(); p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize(); pluginCfgs = cfg.getPluginConfigurations(); + pubPoolSize = cfg.getPublicThreadPoolSize(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); segResolveAttempts = cfg.getSegmentationResolveAttempts(); @@ -1629,6 +1636,22 @@ public class IgniteConfiguration { } /** + * TODO: IGNITE-752 + * @return + */ + public int getFailureDetectionThreshold() { + return failureDetectionThreshold; + } + + /** + * TODO: IGNITE-752 + * @param failureDetectionThreshold + */ + public void setFailureDetectionThreshold(int failureDetectionThreshold) { + this.failureDetectionThreshold = failureDetectionThreshold; + } + + /** * Should return fully configured load balancing SPI implementation. If not provided, * {@link RoundRobinLoadBalancingSpi} will be used. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 5e557bd..82ed3d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; @@ -35,6 +36,7 @@ import org.jetbrains.annotations.*; import javax.management.*; import java.io.*; +import java.net.*; import java.text.*; import java.util.*; @@ -73,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Discovery listener. */ private GridLocalEventListener paramsLsnr; + /** Failure detection threshold will not be used usage switch. */ + private boolean failureDetectionThresholdEnabled = true; + + /** + * Failure detection threshold. Initialized with the value of + * {@link IgniteConfiguration#getFailureDetectionThreshold()}. + */ + private long failureDetectionThreshold; + /** * Creates new adapter and initializes it from the current (this) class. * SPI name will be initialized to the simple name of the class @@ -194,6 +205,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement spiCtx = new GridDummySpiContext(locNode, true, spiCtx); } + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + if (!failureDetectionThresholdEnabled) { + failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); + + assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0"); + } + } + /** * Inject ignite instance. */ @@ -560,6 +580,89 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** + * TODO: IGNITE-752 + * @param dfltTimeout + * @return + */ + public long firstNetOperationTimeout(long dfltTimeout) { + return !failureDetectionThresholdEnabled ? dfltTimeout : failureDetectionThreshold; + } + + /** + * TODO: IGNITE-752 + * @param curTimeout + * @param lastOperStartTime + * @param dfltTimeout + * @return + * @throws IOException + */ + public long nextNetOperationTimeout(long curTimeout, long lastOperStartTime, long dfltTimeout) + throws NetOperationTimeoutException { + if (!failureDetectionThresholdEnabled) + return dfltTimeout; + + long timeLeft = curTimeout - lastOperStartTime; + + if (timeLeft <= 0) + throw new NetOperationTimeoutException("Network operation timed out. Increase failure detection threshold" + + " using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts manually." + + " Current failure detection threshold: " + failureDetectionThreshold); + + return timeLeft; + } + + /** + * TODO: IGNITE-752 + * @param e + * @return + */ + public boolean checkFailureDetectionThresholdReached(Exception e) { + if (!failureDetectionThresholdEnabled) + return false; + + return e instanceof NetOperationTimeoutException || e instanceof SocketTimeoutException || + X.hasCause(e, NetOperationTimeoutException.class, SocketException.class); + } + + /** + * TODO: IGNITE-752 + * @param enabled + */ + public void failureDetectionThresholdEnabled(boolean enabled) { + failureDetectionThresholdEnabled = enabled; + } + + /** + * TODO: IGNITE-752 + * @return + */ + public boolean failureDetectionThresholdEnabled() { + return failureDetectionThresholdEnabled; + } + + /** + * TODO: IGNITE-752 + * @return + */ + public long failureDetectionThreshold() { + return failureDetectionThreshold; + } + + + /** + * TODO: IGNITE-752 + */ + public static class NetOperationTimeoutException extends IgniteCheckedException { + /** + * Constructor. + * @param msg Error message. + */ + public NetOperationTimeoutException(String msg) { + super(msg); + } + } + + /** * Temporarily SPI context. */ private class GridDummySpiContext implements IgniteSpiContext { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index d51293e..9dd565c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -50,6 +50,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.*; import static org.apache.ignite.spi.IgnitePortProtocol.*; import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; +import static org.apache.ignite.spi.IgniteSpiAdapter.NetOperationTimeoutException; /** * @@ -508,18 +509,32 @@ class ServerImpl extends TcpDiscoveryImpl { try { Socket sock = null; - for (int i = 0; i < spi.reconCnt; i++) { + long timeout = 0; + long lastOperStartTs = 0; + + int reconCnt = 0; + + while (true) { try { if (addr.isUnresolved()) addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); - long tstamp = U.currentTimeMillis(); + timeout = lastOperStartTs == 0 ? spi.firstNetOperationTimeout(spi.getSocketTimeout()) : + spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout()); + + long tstamp = lastOperStartTs = U.currentTimeMillis(); + + sock = spi.openSocket(addr, timeout); - sock = spi.openSocket(addr); + timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout()); + lastOperStartTs = U.currentTimeMillis(); - spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeout); - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout); + timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getNetworkTimeout()); + lastOperStartTs = U.currentTimeMillis(); + + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeout); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -541,6 +556,11 @@ class ServerImpl extends TcpDiscoveryImpl { errs = new ArrayList<>(); errs.add(e); + + if (spi.checkFailureDetectionThresholdReached(e)) + break; + else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + break; } finally { U.closeQuiet(sock); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 4240d6a..e5d5cd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -327,9 +327,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** */ private boolean forceSrvMode; - /** User manually set one of the failure detection timeouts. Failure detection threshold will not be used. */ - private boolean manualFailureDetectionSetup; - /** {@inheritDoc} */ @Override public String getSpiState() { return impl.getSpiState(); @@ -502,7 +499,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @IgniteSpiConfiguration(optional = true) public TcpDiscoverySpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; - manualFailureDetectionSetup = true; + + failureDetectionThresholdEnabled(false); return this; } @@ -529,7 +527,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @IgniteSpiConfiguration(optional = true) public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) { this.maxAckTimeout = maxAckTimeout; - manualFailureDetectionSetup = true; + + failureDetectionThresholdEnabled(false); return this; } @@ -702,7 +701,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @IgniteSpiConfiguration(optional = true) public TcpDiscoverySpi setSocketTimeout(long sockTimeout) { this.sockTimeout = sockTimeout; - manualFailureDetectionSetup = true; + + failureDetectionThresholdEnabled(false); return this; } @@ -720,7 +720,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @IgniteSpiConfiguration(optional = true) public TcpDiscoverySpi setAckTimeout(long ackTimeout) { this.ackTimeout = ackTimeout; - manualFailureDetectionSetup = true; + + failureDetectionThresholdEnabled(false); return this; } @@ -735,7 +736,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T @IgniteSpiConfiguration(optional = true) public TcpDiscoverySpi setNetworkTimeout(long netTimeout) { this.netTimeout = netTimeout; - manualFailureDetectionSetup = true; + + failureDetectionThresholdEnabled(false); return this; } @@ -1101,10 +1103,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * @param sockAddr Remote address. + * @param timeout Socket opening timeout. * @return Opened socket. * @throws IOException If failed. */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + protected Socket openSocket(InetSocketAddress sockAddr, long timeout) throws IOException, + NetOperationTimeoutException { assert sockAddr != null; InetSocketAddress resolved = sockAddr.isUnresolved() ? @@ -1120,9 +1124,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)sockTimeout); + long startTs = U.currentTimeMillis(); + + sock.connect(resolved, (int)timeout); + + timeout = nextNetOperationTimeout(timeout, startTs, sockTimeout); - writeToSocket(sock, U.IGNITE_HEADER); + writeToSocket(sock, U.IGNITE_HEADER, timeout); return sock; } @@ -1132,14 +1140,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * * @param sock Socket. * @param data Raw data to write. + * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { + private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException { assert sock != null; assert data != null; - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + //SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); @@ -1175,11 +1185,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * * @param sock Socket. * @param msg Message. + * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + throws IOException, IgniteCheckedException { + writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. } /** @@ -1192,8 +1204,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, IgniteCheckedException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; assert bout != null; @@ -1201,7 +1213,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T // Marshall message first to perform only write after. marsh.marshal(msg, bout); - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); @@ -1548,39 +1560,43 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** {@inheritDoc} */ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) { - if (ackTimeout == 0) - ackTimeout = DFLT_ACK_TIMEOUT_CLIENT; + super.spiStart(gridName); - if (sockTimeout == 0) - sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT; + if (!failureDetectionThresholdEnabled()) { + if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) { + if (ackTimeout == 0) + ackTimeout = DFLT_ACK_TIMEOUT_CLIENT; - impl = new ClientImpl(this); + if (sockTimeout == 0) + sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT; - ctxInitLatch.countDown(); - } - else { - if (ackTimeout == 0) - ackTimeout = DFLT_ACK_TIMEOUT; + impl = new ClientImpl(this); + + ctxInitLatch.countDown(); + } else { + if (ackTimeout == 0) + ackTimeout = DFLT_ACK_TIMEOUT; - if (sockTimeout == 0) - sockTimeout = DFLT_SOCK_TIMEOUT; + if (sockTimeout == 0) + sockTimeout = DFLT_SOCK_TIMEOUT; - impl = new ServerImpl(this); + impl = new ServerImpl(this); + } + + assertParameter(netTimeout > 0, "networkTimeout > 0"); + assertParameter(sockTimeout > 0, "sockTimeout > 0"); + assertParameter(ackTimeout > 0, "ackTimeout > 0"); + assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); + assertParameter(reconCnt > 0, "reconnectCnt > 0"); } assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); assertParameter(locPort > 1023, "localPort > 1023"); assertParameter(locPortRange >= 0, "localPortRange >= 0"); assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); assertParameter(threadPri > 0, "threadPri > 0"); @@ -1598,11 +1614,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T log.debug(configInfo("localPort", locPort)); log.debug(configInfo("localPortRange", locPortRange)); log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("maxAckTimeout", maxAckTimeout)); - log.debug(configInfo("reconnectCount", reconCnt)); + + if (!failureDetectionThresholdEnabled()) { + log.debug(configInfo("networkTimeout", netTimeout)); + log.debug(configInfo("sockTimeout", sockTimeout)); + log.debug(configInfo("ackTimeout", ackTimeout)); + log.debug(configInfo("maxAckTimeout", maxAckTimeout)); + log.debug(configInfo("reconnectCount", reconCnt)); + } + else + log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold())); + log.debug(configInfo("ipFinder", ipFinder)); log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); log.debug(configInfo("heartbeatFreq", hbFreq)); @@ -1611,7 +1633,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } // Warn on odd network timeout. - if (netTimeout < 3000) + if (!failureDetectionThresholdEnabled() && netTimeout < 3000) U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); registerMBean(gridName, this, TcpDiscoverySpiMBean.class);