Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 6bb1875fd -> 347eb70c8
ignite-752: implemented connection check message Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/347eb70c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/347eb70c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/347eb70c Branch: refs/heads/ignite-752 Commit: 347eb70c8d95b8922bc1a79a6dc9e791b56c9ba5 Parents: 6bb1875 Author: Denis Magda <dma...@gridgain.com> Authored: Fri Jul 17 21:59:04 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Fri Jul 17 21:59:04 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 190 +++++++++++++++++-- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 10 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 40 +++- .../tcp/internal/TcpDiscoveryNode.java | 19 ++ .../TcpDiscoveryConnectionCheckMessage.java | 45 +++++ 5 files changed, 291 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3a50c31..74c7dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -88,6 +88,9 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private CheckStatusSender chkStatusSnd; + /** Connection checker. */ + private CheckConnectionWorker chkConnWorker; + /** IP finder cleaner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IpFinderCleaner ipFinderCleaner; @@ -232,8 +235,14 @@ class ServerImpl extends TcpDiscoveryImpl { hbsSnd = new HeartbeatsSender(); hbsSnd.start(); - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); + if (spi.failureDetectionThresholdEnabled()) { + chkConnWorker = new CheckConnectionWorker(); + chkConnWorker.start(); + } + else { + chkStatusSnd = new CheckStatusSender(); + chkStatusSnd.start(); + } if (spi.ipFinder.isShared()) { ipFinderCleaner = new IpFinderCleaner(); @@ -323,8 +332,14 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(hbsSnd); U.join(hbsSnd, log); - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); + if (spi.failureDetectionThresholdEnabled()) { + U.interrupt(chkConnWorker); + U.join(chkConnWorker, log); + } + else { + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + } U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -619,6 +634,14 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** {@inheritDoc} */ + @Override protected void onDataRead() { + if (spi.failureDetectionThresholdEnabled()) { + locNode.lastDataReceivedTime(U.currentTimeMillis()); + chkConnWorker.reset(); + } + } + /** * Tries to join this node to topology. * @@ -1287,8 +1310,14 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(hbsSnd); U.join(hbsSnd, log); - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); + if (spi.failureDetectionThresholdEnabled()) { + U.interrupt(chkConnWorker); + U.join(chkConnWorker, log); + } + else { + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + } U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); @@ -1378,7 +1407,12 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Internal threads: ").append(U.nl()); b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); + + if (spi.failureDetectionThresholdEnabled()) + b.append(" Check connectino worker: ").append(threadStatus(chkConnWorker)).append(U.nl()); + else + b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); + b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); @@ -1426,7 +1460,8 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean recordable(TcpDiscoveryAbstractMessage msg) { return !(msg instanceof TcpDiscoveryHeartbeatMessage) && !(msg instanceof TcpDiscoveryStatusCheckMessage) && - !(msg instanceof TcpDiscoveryDiscardMessage); + !(msg instanceof TcpDiscoveryDiscardMessage) && + !(msg instanceof TcpDiscoveryConnectionCheckMessage); } /** @@ -1568,6 +1603,82 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * TODO: IGNITE-752 + */ + private class CheckConnectionWorker extends IgniteSpiThread { + /** */ + private volatile boolean msgInQueue; + + /** */ + private volatile boolean logMsgPrinted; + + /** + * Constructor + */ + public CheckConnectionWorker() { + super(spi.ignite().name(), "tcp-disco-conn-check-worker", log); + + setPriority(spi.threadPri); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Connection check worker has been started."); + + while (!isInterrupted()) { + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping connection check worker (SPI is not connected to topology)."); + + return; + } + + if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() && + ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) { + + if (!logMsgPrinted) { + log.info("Local node seems to be disconnected from topology (failure detection threshold " + + "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() + + ", connCheckFreq=" + spi.connCheckFreq + ']'); + + logMsgPrinted = true; + } + } + + if (msgInQueue) { + Thread.sleep(spi.connCheckFreq); + + continue; + } + + if (ring.hasRemoteNodes()) { + // Send the message using ring message worker in order to reuse an existed socket to the next node. + msgInQueue = true; + + msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode)); + } + + Thread.sleep(spi.connCheckFreq); + } + } + + /** + * TODO: IGNITE-752 + */ + private void reset() { + logMsgPrinted = false; + } + + /** + * TODO: IGNITE-752 + */ + private void messageProcessed() { + msgInQueue = false; + } + } + + /** * Thread that cleans IP finder and keeps it in the correct state, unregistering * addresses of the nodes that has left the topology. * <p> @@ -1910,6 +2021,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); + else if (msg instanceof TcpDiscoveryConnectionCheckMessage) + processConnectionCheckMessage((TcpDiscoveryConnectionCheckMessage)msg); + else if (msg instanceof TcpDiscoveryClientReconnectMessage) processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); @@ -1956,9 +2070,10 @@ class ServerImpl extends TcpDiscoveryImpl { * Sends message across the ring. * * @param msg Message to send + * @return Response code. */ @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) - private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + private int sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { assert msg != null; assert ring.hasRemoteNodes(); @@ -2007,6 +2122,8 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); + int msgRes = RES_FAIL; + while (true) { if (searchNext) { TcpDiscoveryNode newNext = ring.nextNode(failedNodes); @@ -2272,6 +2389,16 @@ class ServerImpl extends TcpDiscoveryImpl { } } + if (msg instanceof TcpDiscoveryConnectionCheckMessage && next.version().greaterThanEqual( + TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, + TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { + // Preserve backward compatibility with nodes of older versions. + assert msg.creatorNodeId().equals(getLocalNodeId()); + + msg = new TcpDiscoveryStatusCheckMessage(locNode, null); + } + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); try { @@ -2284,17 +2411,17 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); + msgRes = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) log.debug("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + - ", res=" + res + ']'); + ", res=" + msgRes + ']'); if (debugMode) debugLog("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + - ", res=" + res + ']'); + ", res=" + msgRes + ']'); } finally { clearNodeAddedMessage(msg); @@ -2421,6 +2548,8 @@ class ServerImpl extends TcpDiscoveryImpl { "To speed up failure detection please see 'Failure Detection' section under javadoc" + " for 'TcpDiscoverySpi'"); } + + return msgRes; } /** @@ -3847,6 +3976,35 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * TODO: IGNITE-752 + * @param msg + */ + private void processConnectionCheckMessage(TcpDiscoveryConnectionCheckMessage msg) { + assert msg.creatorNodeId().equals(getLocalNodeId()) && msg.senderNodeId() == null; + + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Connection check message discarded (local node receives updates)."); + + chkConnWorker.messageProcessed(); + return; + } + + int res = RES_FAIL; + + if (ring.hasRemoteNodes()) + res = sendMessageAcrossRing(msg); + + chkConnWorker.messageProcessed(); + + if (res == TcpDiscoveryConnectionCheckMessage.STATUS_RECON) { + U.warn(log, "Node is out of topology (probably, due to short-time network problems)."); + + notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode); + } + } + + /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. @@ -4261,6 +4419,8 @@ class ServerImpl extends TcpDiscoveryImpl { return; } + + // Handshake. TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; @@ -4418,6 +4578,12 @@ class ServerImpl extends TcpDiscoveryImpl { break; } } + else if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + spi.writeToSocket(msg, sock, ring.node(msg.creatorNodeId()) != null ? RES_OK : + TcpDiscoveryConnectionCheckMessage.STATUS_RECON, socketTimeout); + + continue; + } else if (msg instanceof TcpDiscoveryClientReconnectMessage) { if (clientMsgWrk != null) { TcpDiscoverySpiState state = spiStateCopy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index cf113a2..85b1f38 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,6 +37,9 @@ abstract class TcpDiscoveryImpl { /** Response OK. */ protected static final int RES_OK = 1; + /** Response FAIL. */ + protected static final int RES_FAIL = -1; + /** Response CONTINUE JOIN. */ protected static final int RES_CONTINUE_JOIN = 100; @@ -131,6 +134,13 @@ abstract class TcpDiscoveryImpl { } /** + * TODO: IGNITE-752 + */ + protected void onDataRead() { + // No-op + } + + /** * @param log Logger. */ public abstract void dumpDebugInfo(IgniteLogger log); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 8d99bcd..fcba8c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -157,6 +157,15 @@ import java.util.concurrent.atomic.*; @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { + /** Failure detection threshold feature major version. */ + final static int FAILURE_DETECTION_MAJOR_VER = 1; + + /** Failure detection threshold feature minor version. */ + final static int FAILURE_DETECTION_MINOR_VER = 3; + + /** Failure detection threshold feature maintainance version. */ + final static int FAILURE_DETECTION_MAINT_VER = 1; + /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -202,6 +211,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */ public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; + /** Default connection check frequency. */ + public static final int DFLT_CONN_CHECK_FREQ = 1000; + /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; @@ -242,6 +254,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Size of topology snapshots history. */ protected int topHistSize = DFLT_TOP_HISTORY_SIZE; + /** Connection check frequency. Used in conjunction with failure detection threshold. */ + protected long connCheckFreq = DFLT_CONN_CHECK_FREQ; + /** Grid discovery listener. */ protected volatile DiscoverySpiListener lsnr; @@ -828,6 +843,19 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** + * TODO: IGNITE-752 + * + * @param connCheckFreq + * @return + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setConnectionCheckFrequency(long connCheckFreq) { + this.connCheckFreq = connCheckFreq; + + return this; + } + + /** * @return Size of topology snapshots history. */ public long getTopHistorySize() { @@ -1332,7 +1360,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T try { sock.setSoTimeout((int)timeout); - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + + impl.onDataRead(); + + return res; } catch (IOException | IgniteCheckedException e) { if (X.hasCause(e, SocketTimeoutException.class)) @@ -1373,6 +1405,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T if (res == -1) throw new EOFException(); + impl.onDataRead(); + return res; } catch (SocketTimeoutException e) { @@ -1648,6 +1682,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T log.debug(configInfo("threadPri", threadPri)); if (!failureDetectionThresholdEnabled()) { + log.debug("Failure detection threshold is disabled and connection check frequency is ignored because " + + "at least one of the parameters from this list has been set manually: 'networkTimeout'," + + " 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'."); + log.debug(configInfo("networkTimeout", netTimeout)); log.debug(configInfo("sockTimeout", sockTimeout)); log.debug(configInfo("ackTimeout", ackTimeout)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 032cf01..b33c0c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -89,6 +89,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @GridToStringExclude private volatile long lastUpdateTime = U.currentTimeMillis(); + /** The most recent time when a data chunk was received from a node. */ + private volatile long lastDataReceivedTime = U.currentTimeMillis(); + /** Metrics provider (transient). */ @GridToStringExclude private DiscoveryMetricsProvider metricsProvider; @@ -385,6 +388,22 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** + * TODO: IGNITE-752 + * @return + */ + public long lastDataReceivedTime() { + return lastDataReceivedTime; + } + + /** + * TODO: IGNITE-752 + * @param lastDataReceivedTime + */ + public void lastDataReceivedTime(long lastDataReceivedTime) { + this.lastDataReceivedTime = lastDataReceivedTime; + } + + /** * Gets visible flag. * * @return {@code true} if node is in visible state. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java new file mode 100644 index 0000000..3249220 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; + +/** + * Message used to check whether a node is still connected to the topology. + * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node + * which directly replies to the sender without message re-translation to the coordinator. + */ +public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage { + /** Status RECONNECT. */ + public static final int STATUS_RECON = 500; + + /** + * Constructor. + * + * @param creatorNode Node created this message. + */ + public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) { + super(creatorNode.id()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString()); + } +}