Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 c453ab8dc -> c399a828f
ignite-752: fixed the rest of review notes Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c399a828 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c399a828 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c399a828 Branch: refs/heads/ignite-752 Commit: c399a828f51644dc0991a4d73b195dc900a32ec5 Parents: c453ab8 Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 22 08:54:48 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 22 08:54:48 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 7 +- .../IgniteSpiOperationTimeoutController.java | 102 -------------- .../spi/IgniteSpiOperationTimeoutException.java | 2 +- .../spi/IgniteSpiOperationTimeoutHelper.java | 102 ++++++++++++++ .../communication/tcp/TcpCommunicationSpi.java | 20 +-- .../ignite/spi/discovery/tcp/ClientImpl.java | 12 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 132 +++++++++---------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 8 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 4 +- ...TcpDiscoverySpiFailureThresholdSelfTest.java | 10 +- 10 files changed, 202 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 33a250d..5e8f061 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -599,7 +599,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement if (failureDetectionThresholdEnabled) { failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold(); - assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0"); + if (failureDetectionThreshold <= 0) + throw new IgniteSpiException("Invalid failure detection threshold value: " + failureDetectionThreshold); + else if (failureDetectionThreshold <= 10) + // Because U.currentTimeInMillis() is updated once in 10 milliseconds. + log.warning("Failure detection threshold is too low, it may lead to unpredictable behaviour " + + "[failureDetectionThreshold=" + failureDetectionThreshold + ']'); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java deleted file mode 100644 index 6213893..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.spi; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.net.*; - -/** - * Object that incorporates logic that determines a timeout value for the next network related operation and checks - * whether a failure detection threshold is reached or not. - * - * A new instance of the class should be created for every complex network based operations that usually consists of - * request and response parts. - */ -public class IgniteSpiOperationTimeoutController { - /** */ - private long lastOperStartTs; - - /** */ - private long timeout; - - /** */ - private final boolean failureDetectionThresholdEnabled; - - /** */ - private final long failureDetectionThreshold; - - /** - * Constructor. - * - * @param adapter SPI adapter. - */ - public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) { - failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); - failureDetectionThreshold = adapter.failureDetectionThreshold(); - } - - /** - * Returns a timeout value to use for the next network operation. - * - * If failure detection threshold is enabled then the returned value is a portion of time left since the last time - * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned. - * - * @param dfltTimeout Timeout to use if failure detection threshold is disabled. - * @return Timeout in milliseconds. - * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses - * this {@code IgniteSpiOperationTimeoutController}. - */ - public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { - if (!failureDetectionThresholdEnabled) - return dfltTimeout; - - if (lastOperStartTs == 0) { - timeout = failureDetectionThreshold; - lastOperStartTs = U.currentTimeMillis(); - } - else { - long curTs = U.currentTimeMillis(); - - timeout = timeout - (curTs - lastOperStartTs); - - lastOperStartTs = curTs; - - if (timeout <= 0) - throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + - "'failureDetectionThreshold' configuration property or set SPI specific timeouts" + - " manually. Current failure detection threshold: " + failureDetectionThreshold); - } - - return timeout; - } - - /** - * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached. - * - * @param e Exception. - * @return {@code true} if failure detection threshold is reached, {@code false} otherwise. - */ - public boolean checkFailureDetectionThresholdReached(Exception e) { - if (!failureDetectionThresholdEnabled) - return false; - - return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || - X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java index 1ea05fd..235fd2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java @@ -27,7 +27,7 @@ import org.apache.ignite.configuration.*; * {@link TcpCommunicationSpi}. * * For more information refer to {@link IgniteConfiguration#setFailureDetectionThreshold(long)} and - * {@link IgniteSpiOperationTimeoutController}. + * {@link IgniteSpiOperationTimeoutHelper}. */ public class IgniteSpiOperationTimeoutException extends IgniteCheckedException { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java new file mode 100644 index 0000000..03858d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spi; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; + +/** + * Object that incorporates logic that determines a timeout value for the next network related operation and checks + * whether a failure detection threshold is reached or not. + * + * A new instance of the class should be created for every complex network based operations that usually consists of + * request and response parts. + */ +public class IgniteSpiOperationTimeoutHelper { + /** */ + private long lastOperStartTs; + + /** */ + private long timeout; + + /** */ + private final boolean failureDetectionThresholdEnabled; + + /** */ + private final long failureDetectionThreshold; + + /** + * Constructor. + * + * @param adapter SPI adapter. + */ + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) { + failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled(); + failureDetectionThreshold = adapter.failureDetectionThreshold(); + } + + /** + * Returns a timeout value to use for the next network operation. + * + * If failure detection threshold is enabled then the returned value is a portion of time left since the last time + * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned. + * + * @param dfltTimeout Timeout to use if failure detection threshold is disabled. + * @return Timeout in milliseconds. + * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses + * this {@code IgniteSpiOperationTimeoutController}. + */ + public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { + if (!failureDetectionThresholdEnabled) + return dfltTimeout; + + if (lastOperStartTs == 0) { + timeout = failureDetectionThreshold; + lastOperStartTs = U.currentTimeMillis(); + } + else { + long curTs = U.currentTimeMillis(); + + timeout = timeout - (curTs - lastOperStartTs); + + lastOperStartTs = curTs; + + if (timeout <= 0) + throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " + + "'failureDetectionThreshold' configuration property or set SPI specific timeouts" + + " manually. Current failure detection threshold: " + failureDetectionThreshold); + } + + return timeout; + } + + /** + * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached. + * + * @param e Exception. + * @return {@code true} if failure detection threshold is reached, {@code false} otherwise. + */ + public boolean checkThresholdReached(Exception e) { + if (!failureDetectionThresholdEnabled) + return false; + + return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || + X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1f09f05..b24d424 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1941,7 +1941,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long connTimeout0 = connTimeout; - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); while (true) { GridCommunicationClient client; @@ -1949,12 +1949,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { client = new GridShmemCommunicationClient(metricsLsnr, port, - timeoutCtrl.nextTimeoutChunk(connTimeout), + timeoutHelper.nextTimeoutChunk(connTimeout), log, getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) throw e; // Reconnect for the second time, if connection is not established. @@ -1968,13 +1968,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0)); + safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || - timeoutCtrl.checkFailureDetectionThresholdReached(e))) { + timeoutHelper.checkThresholdReached(e))) { log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" + failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']'); @@ -2100,7 +2100,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int attempt = 1; - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); while (!conn) { // Reconnection on handshake timeout. try { @@ -2128,9 +2128,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = -1; try { - ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout)); + ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout)); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0)); + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0)); if (rcvCnt == -1) return null; @@ -2172,7 +2172,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException || - timeoutCtrl.checkFailureDetectionThresholdReached(e))) { + timeoutHelper.checkThresholdReached(e))) { String msg = "Handshake timed out (failure detection threshold is reached) " + "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']'; @@ -2240,7 +2240,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); - boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e); + boolean failureDetThrReached = timeoutHelper.checkThresholdReached(e); if (failureDetThrReached) LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 196c1b3..e0d1741 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -488,7 +488,7 @@ class ClientImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); while (true) { boolean openSock = false; @@ -498,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutCtrl); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; @@ -506,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl { req.client(true); - spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); @@ -536,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl { msg.client(true); - spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -544,7 +544,7 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)), res.clientAck()); + return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -559,7 +559,7 @@ class ClientImpl extends TcpDiscoveryImpl { errs.add(e); - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) break; if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2a09c62..1f98ba8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -462,7 +462,7 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); if (F.contains(spi.locNodeAddrs, addr)) { if (clientNodeId == null) @@ -476,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean clientPingRes; try { - clientPingRes = clientWorker.ping(timeoutCtrl); + clientPingRes = clientWorker.ping(timeoutHelper); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -508,12 +508,12 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutCtrl); + sock = spi.openSocket(addr, timeoutHelper); spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), - timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk( + TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { @@ -537,7 +537,7 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) break; else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) break; @@ -600,14 +600,8 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void onDataReceived() { - if (spi.failureDetectionThresholdEnabled()) { - if (locNode != null) - locNode.lastDataReceivedTime(U.currentTimeMillis()); - - if (msgWorker != null) - // Node receives messages from remote nodes, reset this flag. - msgWorker.failureDetectionNotified = false; - } + if (spi.failureDetectionThresholdEnabled() && locNode != null) + locNode.lastDataReceivedTime(U.currentTimeMillis()); } /** @@ -894,7 +888,7 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); int reconCnt = 0; @@ -910,15 +904,15 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutCtrl); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk( + spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk( + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { @@ -933,7 +927,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send message. tstamp = U.currentTimeMillis(); - spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -950,7 +944,7 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -976,7 +970,7 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) break; if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) @@ -1774,8 +1768,8 @@ class ServerImpl extends TcpDiscoveryImpl { /** Time when the last status message has been sent. */ private long lastTimeConnCheckMsgSent; - /** Whether an error message has been printed out when failure detection threshold is reached. */ - private volatile boolean failureDetectionNotified; + /** Flag that keeps info on whether the threshold is reached or not. */ + private boolean failureThresholdReached; /** Last time hearbeat message has been sent. */ private long lastTimeHbMsgSent; @@ -1783,7 +1777,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ protected RingMessageWorker() { - super("tcp-disco-msg-worker"); + super("tcp-disco-msg-worker", 10); } /** @@ -1843,6 +1837,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (spi.ensured(msg)) msgHist.add(msg); + if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) + // Reset the flag. + failureThresholdReached = false; + spi.stats.onMessageProcessingFinished(msg); } @@ -1921,16 +1919,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("No next node in topology."); - if (ring.hasRemoteNodes()) { + /*if (ring.hasRemoteNodes()) { msg.senderNodeId(locNodeId); - if (msg instanceof TcpDiscoveryConnectionCheckMessage || - (msg instanceof TcpDiscoveryStatusCheckMessage && - ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg())) - break; - addMessage(msg); - } + }*/ break; } @@ -1975,12 +1968,12 @@ class ServerImpl extends TcpDiscoveryImpl { int reconCnt = 0; - IgniteSpiOperationTimeoutController timeoutCtrl = null; + IgniteSpiOperationTimeoutHelper timeoutHelper = null; while (true) { if (sock == null) { - if (timeoutCtrl == null) - timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); nextNodeExists = false; @@ -1992,16 +1985,16 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutCtrl); + sock = spi.openSocket(addr, timeoutHelper); openSock = true; // Handshake. writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), - timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, - timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2088,7 +2081,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) break; - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) break; else if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) { @@ -2111,7 +2104,7 @@ class ServerImpl extends TcpDiscoveryImpl { nextNodeExists = true; // Resetting timeout control object to let the code below to use a new one // for the next bunch of operations. - timeoutCtrl = null; + timeoutHelper = null; } } } @@ -2149,11 +2142,11 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); - if (timeoutCtrl == null) - timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); try { - writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk( + writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); } finally { @@ -2162,7 +2155,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msg=" + msg.id() + @@ -2176,7 +2169,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Resetting timeout control object to create a new one for the next bunch of // operations. - timeoutCtrl = null; + timeoutHelper = null; } } @@ -2185,14 +2178,14 @@ class ServerImpl extends TcpDiscoveryImpl { try { long tstamp = U.currentTimeMillis(); - if (timeoutCtrl == null) - timeoutCtrl = new IgniteSpiOperationTimeoutController(spi); + if (timeoutHelper == null) + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); - writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + @@ -2227,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); - if (timeoutCtrl.checkFailureDetectionThresholdReached(e)) + if (timeoutHelper.checkThresholdReached(e)) break; if (!spi.failureDetectionThresholdEnabled()) { @@ -4045,14 +4038,17 @@ class ServerImpl extends TcpDiscoveryImpl { * Check connection aliveness status. */ private void checkConnection() { - if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime() + if (!spi.failureDetectionThresholdEnabled()) + return; + + if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { log.info("Local node seems to be disconnected from topology (failure detection threshold " + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + ", connCheckFreq=" + spi.connCheckFreq + ']'); - failureDetectionNotified = true; + failureThresholdReached = true; } long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis(); @@ -4261,17 +4257,17 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); - IgniteSpiOperationTimeoutController timeoutCtrl = - new IgniteSpiOperationTimeoutController(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = + new IgniteSpiOperationTimeoutHelper(spi); if (req.clientNodeId() != null) { ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); if (clientWorker != null) - res.clientExists(clientWorker.ping(timeoutCtrl)); + res.clientExists(clientWorker.ping(timeoutHelper)); } - spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); } else if (log.isDebugEnabled()) log.debug("Ignore ping request, node is stopping."); @@ -4831,7 +4827,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param clientNodeId Node ID. */ protected ClientMessageWorker(Socket sock, UUID clientNodeId) { - super("tcp-disco-client-message-worker"); + super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; @@ -4928,11 +4924,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * @param timeoutCtrl Timeout controller. + * @param timeoutHelper Timeout controller. * @return Ping result. * @throws InterruptedException If interrupted. */ - public boolean ping(IgniteSpiOperationTimeoutController timeoutCtrl) throws InterruptedException { + public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException { if (spi.isNodeStopping0()) return false; @@ -4958,7 +4954,7 @@ class ServerImpl extends TcpDiscoveryImpl { } try { - return fut.get(timeoutCtrl.nextTimeoutChunk(spi.getAckTimeout()), + return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()), TimeUnit.MILLISECONDS); } catch (IgniteInterruptedCheckedException ignored) { @@ -4998,12 +4994,18 @@ class ServerImpl extends TcpDiscoveryImpl { /** Backed interrupted flag. */ private volatile boolean interrupted; + /** Polling timeout. */ + private final long pollingTimeout; + /** * @param name Thread name. + * @param pollingTimeout Messages polling timeout. */ - protected MessageWorkerAdapter(String name) { + protected MessageWorkerAdapter(String name, long pollingTimeout) { super(spi.ignite().name(), name, log); + this.pollingTimeout = pollingTimeout; + setPriority(spi.threadPri); } @@ -5013,14 +5015,12 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS); + TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); - if (msg == null) { + if (msg == null) noMessageLoop(); - continue; - } - - processMessage(msg); + else + processMessage(msg); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 87848d4..588ff98 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1178,11 +1178,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * @param sockAddr Remote address. - * @param timeoutCtrl Timeout controller. + * @param timeoutHelper Timeout helper. * @return Opened socket. * @throws IOException If failed. */ - protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController timeoutCtrl) + protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { assert sockAddr != null; @@ -1199,9 +1199,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout)); + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout)); + writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 994b7b5..a50b060 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -2123,10 +2123,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected Socket openSocket(InetSocketAddress sockAddr, - IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException { + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { waitFor(openSockLock); - return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this)); + return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java index 362be15..1ee839c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java @@ -308,15 +308,15 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr, - IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException { + @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { if (openSocketTimeout) { err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout"); throw err; } else if (openSocketTimeoutWait) { - long timeout = timeoutCtrl.nextTimeoutChunk(0); + long timeout = timeoutHelper.nextTimeoutChunk(0); try { Thread.sleep(timeout + 1000); @@ -326,14 +326,14 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe } try { - timeoutCtrl.nextTimeoutChunk(0); + timeoutHelper.nextTimeoutChunk(0); } catch (IgniteSpiOperationTimeoutException e) { throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait")); } } - Socket sock = super.openSocket(sockAddr, timeoutCtrl); + Socket sock = super.openSocket(sockAddr, timeoutHelper); try { Thread.sleep(1500);