ignite-752: support of failure detection threshold in ClientImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3c0ef87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3c0ef87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3c0ef87 Branch: refs/heads/ignite-752 Commit: c3c0ef87be79d71aa922d9a11e2d205df19a7829 Parents: 8e37d32 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 16 11:52:57 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Jul 16 11:52:57 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 83 ++++++++++++++------ .../tcp/TcpClientDiscoverySpiSelfTest.java | 14 ++-- 2 files changed, 69 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 3f05f59..62b2e35 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -194,7 +194,8 @@ class ClientImpl extends TcpDiscoveryImpl { msgWorker.addMessage(SPI_STOP); try { - if (!leaveLatch.await(spi.netTimeout, MILLISECONDS)) + if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getNetworkTimeout(), MILLISECONDS)) U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); } catch (InterruptedException ignored) { @@ -272,7 +273,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (pingFuts.remove(nodeId, finalFut)) finalFut.onDone(false); } - }, spi.netTimeout); + }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : spi.getNetworkTimeout()); sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); } @@ -456,13 +457,17 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<Throwable> errs = null; - long ackTimeout0 = spi.ackTimeout; + long ackTimeout0 = spi.getAckTimeout(); + + int reconCnt = 0; int connectAttempts = 1; UUID locNodeId = getLocalNodeId(); - for (int i = 0; i < spi.reconCnt; i++) { + IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + + while (true) { boolean openSock = false; Socket sock = null; @@ -470,7 +475,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr); + sock = spi.openSocket(addr, timeoutCtrl); openSock = true; @@ -478,7 +483,7 @@ class ClientImpl extends TcpDiscoveryImpl { req.client(true); - spi.writeToSocket(sock, req); + spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); @@ -499,7 +504,7 @@ class ClientImpl extends TcpDiscoveryImpl { msg.client(true); - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -522,6 +527,12 @@ class ClientImpl extends TcpDiscoveryImpl { errs.add(e); + if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + break; + + if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + break; + if (!openSock) { // Reconnect for the second time, if connection is not established. if (connectAttempts < 2) { @@ -533,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl { break; // Don't retry if we can not establish connection. } - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || + X.hasCause(e, SocketTimeoutException.class))) { ackTimeout0 *= 2; if (!checkAckTimeout(ackTimeout0)) @@ -825,11 +837,17 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); + /** */ + private final long socketTimeout; + /** * */ protected SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); + + socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : + spi.getSocketTimeout(); } /** @@ -893,7 +911,7 @@ class ClientImpl extends TcpDiscoveryImpl { msgLsnr.apply(msg); try { - spi.writeToSocket(sock, msg); + spi.writeToSocket(sock, msg, socketTimeout); msg = null; } @@ -954,7 +972,8 @@ class ClientImpl extends TcpDiscoveryImpl { Exception err = null; - long timeout = join ? spi.joinTimeout : spi.netTimeout; + long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getNetworkTimeout()); long startTime = U.currentTimeMillis(); @@ -966,11 +985,17 @@ class ClientImpl extends TcpDiscoveryImpl { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']')); } else - U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + U.error(log, "Failed to reconnect to cluster " + + (spi.failureDetectionThresholdEnabled() ? + "(consider increasing 'failureDetectionThreshold' configuration property) " + + "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + : + "(consider increasing 'networkTimeout' configuration property) [networkTimeout=" + + spi.getNetworkTimeout()) + + ", sock=" + sock + ']'); return; } @@ -983,7 +1008,8 @@ class ClientImpl extends TcpDiscoveryImpl { try { oldTimeout = sock.getSoTimeout(); - sock.setSoTimeout((int)spi.netTimeout); + sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getNetworkTimeout())); InputStream in = new BufferedInputStream(sock.getInputStream()); @@ -993,6 +1019,8 @@ class ClientImpl extends TcpDiscoveryImpl { List<TcpDiscoveryAbstractMessage> msgs = null; while (!isInterrupted()) { + // How to deal with failure detection threshold? It's bigger then networkTimeout and + // we can't just decrease it on every unmarshal operation. TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); if (msg instanceof TcpDiscoveryClientReconnectMessage) { @@ -1028,10 +1056,21 @@ class ClientImpl extends TcpDiscoveryImpl { log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e); if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) { - String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " + - "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' : - "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']'; + String msg; + + if (join) + msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " + + "configuration property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e + + ']'; + else + msg = "Failed to reconnect to cluster " + + (spi.failureDetectionThresholdEnabled() ? + "(consider increasing 'failureDetectionThreshold' configuration property) " + + "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + : + "(consider increasing 'networkTimeout' configuration property) " + + "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']'; + U.warn(log, msg); @@ -1094,7 +1133,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onJoinStarted(); try { - final Socket sock = joinTopology(false, spi.joinTimeout); + final Socket sock = joinTopology(false, spi.getJoinTimeout()); if (sock == null) { joinError(new IgniteSpiException("Join process timed out.")); @@ -1106,13 +1145,13 @@ class ClientImpl extends TcpDiscoveryImpl { sockWriter.setSocket(sock); - if (spi.joinTimeout > 0) { + if (spi.getJoinTimeout() > 0) { timer.schedule(new TimerTask() { @Override public void run() { if (joinLatch.getCount() > 0) queue.add(JOIN_TIMEOUT); } - }, spi.joinTimeout); + }, spi.getJoinTimeout()); } sockReader.setSocket(sock, locNode.clientRouterNodeId()); @@ -1124,7 +1163,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (joinLatch.getCount() > 0) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']')); break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index ec6a526..c8f3942 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -1538,7 +1538,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -1556,17 +1556,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, msg, bout); + super.writeToSocket(sock, msg, bout, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); } /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + @Override protected Socket openSocket(InetSocketAddress sockAddr, + IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException { waitFor(openSockLock); - return super.openSocket(sockAddr); + return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this)); } /** @@ -1595,7 +1596,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException { + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; @@ -1613,7 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } } - super.writeToSocket(msg, sock, res); + super.writeToSocket(msg, sock, res, timeout); } /** {@inheritDoc} */