Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 a371f049b -> 9319206bd
ignite-752: network timeout must not affect failure detection threshold Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9319206b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9319206b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9319206b Branch: refs/heads/ignite-752 Commit: 9319206bd49146c45a4256753eb2768042011d22 Parents: a371f04 Author: Denis Magda <dma...@gridgain.com> Authored: Tue Jul 21 16:33:04 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Tue Jul 21 16:33:04 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 56 ++++++----------- .../ignite/spi/discovery/tcp/ServerImpl.java | 65 +++++++++----------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +- 3 files changed, 50 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 00c668a..196c1b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -203,8 +203,7 @@ class ClientImpl extends TcpDiscoveryImpl { msgWorker.addMessage(SPI_STOP); try { - if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getNetworkTimeout(), MILLISECONDS)) + if (!leaveLatch.await(spi.netTimeout, MILLISECONDS)) U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); } catch (InterruptedException ignored) { @@ -294,8 +293,7 @@ class ClientImpl extends TcpDiscoveryImpl { finalFut.onDone(false); } } - }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getNetworkTimeout()); + }, spi.netTimeout); sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); } @@ -1076,8 +1074,7 @@ class ClientImpl extends TcpDiscoveryImpl { Exception err = null; - long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getNetworkTimeout()); + long timeout = join ? spi.joinTimeout : spi.netTimeout; long startTime = U.currentTimeMillis(); @@ -1092,15 +1089,11 @@ class ClientImpl extends TcpDiscoveryImpl { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); } else - U.error(log, "Failed to reconnect to cluster " + - (spi.failureDetectionThresholdEnabled() ? - "(consider increasing 'failureDetectionThreshold' configuration property) " + - "[failureDetectionThreshold=" + spi.failureDetectionThreshold() : - "(consider increasing 'networkTimeout' configuration property) [networkTimeout=" + - spi.getNetworkTimeout()) + ", sock=" + sock + ']'); + U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + + " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); return; } @@ -1116,8 +1109,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { oldTimeout = sock.getSoTimeout(); - sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ? - spi.failureDetectionThreshold() : spi.getNetworkTimeout())); + sock.setSoTimeout((int)spi.netTimeout); InputStream in = new BufferedInputStream(sock.getInputStream()); @@ -1127,8 +1119,6 @@ class ClientImpl extends TcpDiscoveryImpl { List<TcpDiscoveryAbstractMessage> msgs = null; while (!isInterrupted()) { - // How to deal with failure detection threshold? It's bigger then networkTimeout and - // we can't just decrease it on every unmarshal operation. TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); if (msg instanceof TcpDiscoveryClientReconnectMessage) { @@ -1174,15 +1164,11 @@ class ClientImpl extends TcpDiscoveryImpl { if (join) msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " + - "configuration property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e + - ']'; + "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']'; else - msg = "Failed to reconnect to cluster " + - (spi.failureDetectionThresholdEnabled() ? - "(consider increasing 'failureDetectionThreshold' configuration property) " + - "[failureDetectionThreshold=" + spi.failureDetectionThreshold() : - "(consider increasing 'networkTimeout' configuration property) " + - "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']'; + msg = "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + + "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + + ']'; U.warn(log, msg); @@ -1260,14 +1246,14 @@ class ClientImpl extends TcpDiscoveryImpl { if (state == STARTING) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + currSock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); break; } else if (state == DISCONNECTED) { if (log.isDebugEnabled()) log.debug("Failed to reconnect, local node segmented " + - "[joinTimeout=" + spi.getJoinTimeout() + ']'); + "[joinTimeout=" + spi.joinTimeout + ']'); state = SEGMENTED; @@ -1326,10 +1312,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (state != SEGMENTED && state != STOPPED) { if (log.isDebugEnabled()) { log.debug("Failed to restore closed connection, reconnect disabled, " + - (spi.failureDetectionThresholdEnabled() ? - "local node segmented [failureDetectionThreshold=" + - spi.failureDetectionThreshold() + ']' : - "local node segmented [networkTimeout=" + spi.getNetworkTimeout() + ']')); + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); } state = SEGMENTED; @@ -1341,10 +1324,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (state == STARTING || state == CONNECTED) { if (log.isDebugEnabled()) { log.debug("Failed to restore closed connection, will try to reconnect " + - (spi.failureDetectionThresholdEnabled() ? - "[failureDetectionThreshold=" + spi.failureDetectionThreshold() : - "[networkTimeout=" + spi.getNetworkTimeout()) + - ", joinTimeout=" + spi.getJoinTimeout() + ']'); + "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']'); } state = DISCONNECTED; @@ -1433,7 +1413,7 @@ class ClientImpl extends TcpDiscoveryImpl { joinCnt++; - T2<Socket, Boolean> joinRes = joinTopology(false, spi.getJoinTimeout()); + T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout); if (joinRes == null) { if (join) @@ -1451,7 +1431,7 @@ class ClientImpl extends TcpDiscoveryImpl { sockWriter.setSocket(joinRes.get1(), joinRes.get2()); - if (spi.getJoinTimeout() > 0) { + if (spi.joinTimeout > 0) { final int joinCnt0 = joinCnt; timer.schedule(new TimerTask() { @@ -1459,7 +1439,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (joinCnt == joinCnt0 && joining()) queue.add(JOIN_TIMEOUT); } - }, spi.getJoinTimeout()); + }, spi.joinTimeout); } sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b475768..02e13ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -278,8 +278,7 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id())); synchronized (mux) { - long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getNetworkTimeout(); + long timeout = spi.netTimeout; long threshold = U.currentTimeMillis() + timeout; @@ -535,7 +534,7 @@ class ServerImpl extends TcpDiscoveryImpl { timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk( - spi.getNetworkTimeout())); + spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -701,8 +700,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Join request message has been sent (waiting for coordinator response)."); synchronized (mux) { - long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getNetworkTimeout(); + long timeout = spi.netTimeout; long threshold = U.currentTimeMillis() + timeout; @@ -746,9 +744,8 @@ class ServerImpl extends TcpDiscoveryImpl { LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + "Check remote nodes logs for possible error messages. " + "Note that large topology may require significant time to start. " + - "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " + - "if getting this message on the starting nodes [failureDetectionThreshold=" + - spi.failureDetectionThreshold() + ']'); + "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + + "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']'); } } @@ -868,10 +865,10 @@ class ServerImpl extends TcpDiscoveryImpl { "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + addrs); - if (spi.getJoinTimeout() > 0) { + if (spi.joinTimeout > 0) { if (noResStart == 0) noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout()) + else if (U.currentTimeMillis() - noResStart > spi.joinTimeout) throw new IgniteSpiException( "Failed to connect to any address from IP finder within join timeout " + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + @@ -2263,7 +2260,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Handshake. writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), - timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout())); + timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); @@ -3829,27 +3826,29 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Responded to status check message " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']'); - } - catch (IgniteSpiException e) { + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status + () + ']'); + } catch (IgniteSpiException e) { if (e.hasCause(SocketException.class)) { if (log.isDebugEnabled()) log.debug("Failed to respond to status check message (connection " + - "refused) [recipient=" + msg.creatorNodeId() + ", status=" + - msg.status() + ']'); + "refused) [recipient=" + msg.creatorNodeId() + ", status=" + + msg.status() + ']'); onException("Failed to respond to status check message (connection refused) " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); - } - else { + "[recipient=" + msg.creatorNodeId() + ", status=" + msg + .status() + ']', + e); + } else { if (pingNode(msg.creatorNode())) // Node exists and accepts incoming connections. U.error(log, "Failed to respond to status check message [recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); else if (log.isDebugEnabled()) log.debug("Failed to respond to status check message (did the node " + - "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status() - + ']'); + "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + + msg.status() + + ']'); } } } @@ -4407,8 +4406,6 @@ class ServerImpl extends TcpDiscoveryImpl { try { InputStream in; - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); - try { // Set socket options. sock.setKeepAlive(true); @@ -4416,7 +4413,7 @@ class ServerImpl extends TcpDiscoveryImpl { int timeout = sock.getSoTimeout(); - sock.setSoTimeout((int)timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout())); + sock.setSoTimeout((int)spi.netTimeout); for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs) connLsnr.apply(sock); @@ -4461,8 +4458,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Restore timeout. sock.setSoTimeout(timeout); - TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, timeoutCtrl.nextTimeoutChunk( - spi.getNetworkTimeout())); + TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); // Ping. if (msg instanceof TcpDiscoveryPingRequest) { @@ -4471,6 +4467,9 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); + IgniteSpiOperationTimeoutController timeoutCtrl = + new IgniteSpiOperationTimeoutController(spi); + if (req.clientNodeId() != null) { ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); @@ -4486,8 +4485,6 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - - // Handshake. TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; @@ -4501,7 +4498,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (req.client()) res.clientAck(true); - spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, res, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -4591,15 +4589,10 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) - LT.warn(log, null, "Socket operations timed out " + - "(consider increasing 'failureDetectionThreshold' configuration property) " + - "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + ']'); - - else if (!spi.failureDetectionThresholdEnabled() && e.hasCause(SocketTimeoutException.class)) + if (e.hasCause(SocketTimeoutException.class)) LT.warn(log, null, "Socket operation timed out on handshake " + "(consider increasing 'networkTimeout' configuration property) " + - "[netTimeout=" + spi.getNetworkTimeout() + ']'); + "[netTimeout=" + spi.netTimeout + ']'); else if (e.hasCause(ClassNotFoundException.class)) LT.warn(log, null, "Failed to read message due to ClassNotFoundException " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 035ac1a..87848d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -239,11 +239,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T private long ackTimeout; // Must be initialized in the constructor of child class. /** Network timeout. */ - private long netTimeout = DFLT_NETWORK_TIMEOUT; + protected long netTimeout = DFLT_NETWORK_TIMEOUT; /** Join timeout. */ @SuppressWarnings("RedundantFieldInitialization") - private long joinTimeout = DFLT_JOIN_TIMEOUT; + protected long joinTimeout = DFLT_JOIN_TIMEOUT; /** Thread priority for all threads started by SPI. */ protected int threadPri = DFLT_THREAD_PRI; @@ -1666,7 +1666,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } if (!failureDetectionThresholdEnabled()) { - assertParameter(netTimeout > 0, "networkTimeout > 0"); assertParameter(sockTimeout > 0, "sockTimeout > 0"); assertParameter(ackTimeout > 0, "ackTimeout > 0"); assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); @@ -1675,6 +1674,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T else assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq"); + assertParameter(netTimeout > 0, "networkTimeout > 0"); assertParameter(ipFinder != null, "ipFinder != null"); assertParameter(hbFreq > 0, "heartbeatFreq > 0");