http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
new file mode 100644
index 0000000..32d3f54
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -0,0 +1,5144 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.security.*;
+import org.gridgain.grid.security.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.text.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.GridNodeAttributes.*;
+import static org.apache.ignite.spi.IgnitePortProtocol.*;
+import static 
org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
+import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
+import static 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
+
+/**
+ * Discovery SPI implementation that uses TCP/IP for node discovery.
+ * <p>
+ * Nodes are organized in ring. So almost all network exchange (except few 
cases) is
+ * done across it.
+ * <p>
+ * At startup SPI tries to send messages to random IP taken from
+ * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} 
about self start (stops when send succeeds)
+ * and then this info goes to coordinator. When coordinator processes join 
request
+ * and issues node added messages and all other nodes then receive info about 
new node.
+ * <h1 class="header">Configuration</h1>
+ * <h2 class="header">Mandatory</h2>
+ * There are no mandatory configuration parameters.
+ * <h2 class="header">Optional</h2>
+ * The following configuration parameters are optional:
+ * <ul>
+ * <li>IP finder to share info about nodes IP addresses
+ * (see {@link 
#setIpFinder(org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}).
+ * See the following IP finder implementations for details on configuration:
+ * <ul>
+ * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li>
+ * <li>{@gglink 
org.gridgain.grid.spi.discovery.tcp.ipfinder.s3.GridTcpDiscoveryS3IpFinder}</li>
+ * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li>
+ * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li>
+ * <li>{@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder}
 - default</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <ul>
+ * </li>
+ * <li>Local address (see {@link #setLocalAddress(String)})</li>
+ * <li>Local port to bind to (see {@link #setLocalPort(int)})</li>
+ * <li>Local port range to try binding to if previous ports are in use
+ *      (see {@link #setLocalPortRange(int)})</li>
+ * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li>
+ * <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li>
+ * <li>Number of times node tries to (re)establish connection to another node
+ *      (see {@link #setReconnectCount(int)})</li>
+ * <li>Network timeout (see {@link #setNetworkTimeout(long)})</li>
+ * <li>Socket timeout (see {@link #setSocketTimeout(long)})</li>
+ * <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li>
+ * <li>Maximum message acknowledgement timeout (see {@link 
#setMaxAckTimeout(long)})</li>
+ * <li>Join timeout (see {@link #setJoinTimeout(long)})</li>
+ * <li>Thread priority for threads started by SPI (see {@link 
#setThreadPriority(int)})</li>
+ * <li>IP finder clean frequency (see {@link 
#setIpFinderCleanFrequency(long)})</li>
+ * <li>Statistics print frequency (see {@link 
#setStatisticsPrintFrequency(long)}</li>
+ * </ul>
+ * <h2 class="header">Java Example</h2>
+ * <pre name="code" class="java">
+ * GridTcpDiscoverySpi spi = new GridTcpDiscoverySpi();
+ *
+ * GridTcpDiscoveryVmIpFinder finder =
+ *     new GridTcpDiscoveryVmIpFinder();
+ *
+ * spi.setIpFinder(finder);
+ *
+ * GridConfiguration cfg = new GridConfiguration();
+ *
+ * // Override default discovery SPI.
+ * cfg.setDiscoverySpi(spi);
+ *
+ * // Start grid.
+ * GridGain.start(cfg);
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * GridTcpDiscoverySpi can be configured from Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * &lt;bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" 
singleton="true"&gt;
+ *         ...
+ *         &lt;property name="discoverySpi"&gt;
+ *             &lt;bean 
class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi"&gt;
+ *                 &lt;property name="ipFinder"&gt;
+ *                     &lt;bean 
class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder"
 /&gt;
+ *                 &lt;/property&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/property&gt;
+ *         ...
+ * &lt;/bean&gt;
+ * </pre>
+ * <p>
+ * <img src="http://www.gridgain.com/images/spring-small.png";>
+ * <br>
+ * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
+ * @see org.apache.ignite.spi.discovery.DiscoverySpi
+ */
+@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements 
TcpDiscoverySpiMBean {
+    /** Default local port range (value is <tt>100</tt>). */
+    public static final int DFLT_PORT_RANGE = 100;
+
+    /** Default timeout for joining topology (value is <tt>0</tt>). */
+    public static final long DFLT_JOIN_TIMEOUT = 0;
+
+    /** Default reconnect attempts count (value is <tt>10</tt>). */
+    public static final int DFLT_RECONNECT_CNT = 10;
+
+    /** Default max heartbeats count node can miss without initiating status 
check (value is <tt>1</tt>). */
+    public static final int DFLT_MAX_MISSED_HEARTBEATS = 1;
+
+    /** Default max heartbeats count node can miss without failing client node 
(value is <tt>5</tt>). */
+    public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
+
+    /** Default IP finder clean frequency in milliseconds (value is 
<tt>60,000ms</tt>). */
+    public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
+
+    /** Default statistics print frequency in milliseconds (value is 
<tt>0ms</tt>). */
+    public static final long DFLT_STATS_PRINT_FREQ = 0;
+
+    /** Maximum ack timeout value for receiving message acknowledgement in 
milliseconds (value is <tt>600,000ms</tt>). */
+    public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
+
+    /** Node attribute that is mapped to node's external addresses (value is 
<tt>disc.tcp.ext-addrs</tt>). */
+    public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
+
+    /** Address resolver. */
+    private IgniteAddressResolver addrRslvr;
+
+    /** Local port which node uses. */
+    private int locPort = DFLT_PORT;
+
+    /** Local port range. */
+    private int locPortRange = DFLT_PORT_RANGE;
+
+    /** Statistics print frequency. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", 
"RedundantFieldInitialization"})
+    private long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
+
+    /** Maximum message acknowledgement timeout. */
+    private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+
+    /** Join timeout. */
+    @SuppressWarnings("RedundantFieldInitialization")
+    private long joinTimeout = DFLT_JOIN_TIMEOUT;
+
+    /** Max heartbeats count node can miss without initiating status check. */
+    private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
+
+    /** Max heartbeats count node can miss without failing client node. */
+    private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
+
+    /** IP finder clean frequency. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
+
+    /** Reconnect attempts count. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private int reconCnt = DFLT_RECONNECT_CNT;
+
+    /** Grid marshaller. */
+    @IgniteMarshallerResource
+    private IgniteMarshaller gridMarsh;
+
+    /** Nodes ring. */
+    @GridToStringExclude
+    private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
+
+    /** Topology snapshots history. */
+    private final SortedMap<Long, Collection<ClusterNode>> topHist = new 
TreeMap<>();
+
+    /** Socket readers. */
+    private final Collection<SocketReader> readers = new LinkedList<>();
+
+    /** TCP server for discovery SPI. */
+    private TcpServer tcpSrvr;
+
+    /** Message worker. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private RingMessageWorker msgWorker;
+
+    /** Client message workers. */
+    private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new 
ConcurrentHashMap8<>();
+
+    /** Metrics sender. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private HeartbeatsSender hbsSnd;
+
+    /** Status checker. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private CheckStatusSender chkStatusSnd;
+
+    /** IP finder cleaner. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private IpFinderCleaner ipFinderCleaner;
+
+    /** Statistics printer thread. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private StatisticsPrinter statsPrinter;
+
+    /** Failed nodes (but still in topology). */
+    private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+
+    /** Leaving nodes (but still in topology). */
+    private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+
+    /** If non-shared IP finder is used this flag shows whether IP finder 
contains local address. */
+    private boolean ipFinderHasLocAddr;
+
+    /** Addresses that do not respond during join requests send (for resolving 
concurrent start). */
+    private final Collection<SocketAddress> noResAddrs = new 
GridConcurrentHashSet<>();
+
+    /** Addresses that incoming join requests send were send from (for 
resolving concurrent start). */
+    private final Collection<SocketAddress> fromAddrs = new 
GridConcurrentHashSet<>();
+
+    /** Response on join request from coordinator (in case of duplicate ID or 
auth failure). */
+    private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
+
+    /** Context initialization latch. */
+    @GridToStringExclude
+    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
+
+    /** Node authenticator. */
+    private DiscoverySpiNodeAuthenticator nodeAuth;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Map with proceeding ping requests. */
+    private final ConcurrentMap<InetSocketAddress, 
IgniteFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+        new ConcurrentHashMap8<>();
+
+    /** Debug mode. */
+    private boolean debugMode;
+
+    /** Debug messages history. */
+    private int debugMsgHist = 512;
+
+    /** Received messages. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private ConcurrentLinkedDeque<String> debugLog;
+
+    /**
+     * Sets address resolver.
+     *
+     * @param addrRslvr Address resolver.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    @IgniteAddressResolverResource
+    public void setAddressResolver(IgniteAddressResolver addrRslvr) {
+        // Injection should not override value already set by Spring or user.
+        if (this.addrRslvr == null)
+            this.addrRslvr = addrRslvr;
+    }
+
+    /**
+     * Gets address resolver.
+     *
+     * @return Address resolver.
+     */
+    public IgniteAddressResolver getAddressResolver() {
+        return addrRslvr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getReconnectCount() {
+        return reconCnt;
+    }
+
+    /**
+     * Number of times node tries to (re)establish connection to another node.
+     * <p>
+     * Note that SPI implementation will increase {@link #ackTimeout} by 
factor 2
+     * on every retry.
+     * <p>
+     * If not specified, default is {@link #DFLT_RECONNECT_CNT}.
+     *
+     * @param reconCnt Number of retries during message sending.
+     * @see #setAckTimeout(long)
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setReconnectCount(int reconCnt) {
+        this.reconCnt = reconCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaxAckTimeout() {
+        return maxAckTimeout;
+    }
+
+    /**
+     * Sets maximum timeout for receiving acknowledgement for sent message.
+     * <p>
+     * If acknowledgement is not received within this timeout, sending is 
considered as failed
+     * and SPI tries to repeat message sending. Every time SPI retries messing 
sending, ack
+     * timeout will be increased. If no acknowledgement is received and {@code 
maxAckTimeout}
+     * is reached, then the process of message sending is considered as failed.
+     * <p>
+     * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
+     *
+     * @param maxAckTimeout Maximum acknowledgement timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setMaxAckTimeout(long maxAckTimeout) {
+        this.maxAckTimeout = maxAckTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getJoinTimeout() {
+        return joinTimeout;
+    }
+
+    /**
+     * Sets join timeout.
+     * <p>
+     * If non-shared IP finder is used and node fails to connect to
+     * any address from IP finder, node keeps trying to join within this
+     * timeout. If all addresses are still unresponsive, exception is thrown
+     * and node startup fails.
+     * <p>
+     * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}.
+     *
+     * @param joinTimeout Join timeout ({@code 0} means wait forever).
+     *
+     * @see 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setJoinTimeout(long joinTimeout) {
+        this.joinTimeout = joinTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getLocalPort() {
+        TcpDiscoveryNode locNode0 = locNode;
+
+        return locNode0 != null ? locNode0.discoveryPort() : 0;
+    }
+
+    /**
+     * Sets local port to listen to.
+     * <p>
+     * If not specified, default is {@link #DFLT_PORT}.
+     *
+     * @param locPort Local port to bind.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setLocalPort(int locPort) {
+        this.locPort = locPort;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getLocalPortRange() {
+        return locPortRange;
+    }
+
+    /**
+     * Range for local ports. Local node will try to bind on first available 
port
+     * starting from {@link #getLocalPort()} up until
+     * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>.
+     * <p>
+     * If not specified, default is {@link #DFLT_PORT_RANGE}.
+     *
+     * @param locPortRange Local port range to bind.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setLocalPortRange(int locPortRange) {
+        this.locPortRange = locPortRange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMaxMissedHeartbeats() {
+        return maxMissedHbs;
+    }
+
+    /**
+     * Sets max heartbeats count node can miss without initiating status check.
+     * <p>
+     * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}.
+     *
+     * @param maxMissedHbs Max missed heartbeats.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setMaxMissedHeartbeats(int maxMissedHbs) {
+        this.maxMissedHbs = maxMissedHbs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMaxMissedClientHeartbeats() {
+        return maxMissedClientHbs;
+    }
+
+    /**
+     * Sets max heartbeats count node can miss without failing client node.
+     * <p>
+     * If not provided, default value is {@link 
#DFLT_MAX_MISSED_CLIENT_HEARTBEATS}.
+     *
+     * @param maxMissedClientHbs Max missed client heartbeats.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
+        this.maxMissedClientHbs = maxMissedClientHbs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getStatisticsPrintFrequency() {
+        return statsPrintFreq;
+    }
+
+    /**
+     * Sets statistics print frequency.
+     * <p>
+     * If not set default value is {@link #DFLT_STATS_PRINT_FREQ}.
+     * 0 indicates that no print is required. If value is greater than 0 and 
log is
+     * not quiet then statistics are printed out with INFO level.
+     * <p>
+     * This may be very helpful for tracing topology problems.
+     *
+     * @param statsPrintFreq Statistics print frequency in milliseconds.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setStatisticsPrintFrequency(long statsPrintFreq) {
+        this.statsPrintFreq = statsPrintFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIpFinderCleanFrequency() {
+        return ipFinderCleanFreq;
+    }
+
+    /**
+     * Sets IP finder clean frequency in milliseconds.
+     * <p>
+     * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ}
+     *
+     * @param ipFinderCleanFreq IP finder clean frequency.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setIpFinderCleanFrequency(long ipFinderCleanFreq) {
+        this.ipFinderCleanFreq = ipFinderCleanFreq;
+    }
+
+    /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMode {code True} to start SPI in debug mode.
+     */
+    public void setDebugMode(boolean debugMode) {
+        this.debugMode = debugMode;
+    }
+
+    /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMsgHist Message history log size.
+     */
+    public void setDebugMessageHistory(int debugMsgHist) {
+        this.debugMsgHist = debugMsgHist;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSpiState() {
+        synchronized (mux) {
+            return spiState.name();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSocketTimeout() {
+        return sockTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNetworkTimeout() {
+        return netTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getThreadPriority() {
+        return threadPri;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getHeartbeatFrequency() {
+        return hbFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getIpFinderFormatted() {
+        return ipFinder.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMessageWorkerQueueSize() {
+        return msgWorker.queueSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesJoined() {
+        return stats.joinedNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesLeft() {
+        return stats.leftNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesFailed() {
+        return stats.failedNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getPendingMessagesRegistered() {
+        return stats.pendingMessagesRegistered();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getPendingMessagesDiscarded() {
+        return stats.pendingMessagesDiscarded();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAvgMessageProcessingTime() {
+        return stats.avgMessageProcessingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaxMessageProcessingTime() {
+        return stats.maxMessageProcessingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalReceivedMessages() {
+        return stats.totalReceivedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Integer> getReceivedMessages() {
+        return stats.receivedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalProcessedMessages() {
+        return stats.totalProcessedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Integer> getProcessedMessages() {
+        return stats.processedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCoordinatorSinceTimestamp() {
+        return stats.coordinatorSinceTimestamp();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public UUID getCoordinator() {
+        TcpDiscoveryNode crd = resolveCoordinator();
+
+        return crd != null ? crd.id() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        assert nodeId != null;
+
+        UUID locNodeId0 = locNodeId;
+
+        if (locNodeId0 != null && locNodeId0.equals(nodeId))
+            // Return local node directly.
+            return locNode;
+
+        TcpDiscoveryNode node = ring.node(nodeId);
+
+        if (node != null && !node.visible())
+            return null;
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return F.<TcpDiscoveryNode, 
ClusterNode>upcast(ring.visibleRemoteNodes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> injectables() {
+        Collection<Object> res = new LinkedList<>();
+
+        if (ipFinder != null)
+            res.add(ipFinder);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(String gridName) throws IgniteSpiException {
+        spiStart0(false);
+    }
+
+    /**
+     * Starts or restarts SPI after stop (to reconnect).
+     *
+     * @param restart {@code True} if SPI is restarted after stop.
+     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     */
+    private void spiStart0(boolean restart) throws IgniteSpiException {
+        if (!restart)
+            // It is initial start.
+            onSpiStart();
+
+        synchronized (mux) {
+            spiState = DISCONNECTED;
+        }
+
+        if (debugMode) {
+            if (!log.isInfoEnabled())
+                throw new IgniteSpiException("Info log level should be enabled 
for TCP discovery to work " +
+                    "in debug mode.");
+
+            debugLog = new ConcurrentLinkedDeque<>();
+
+            U.quietAndWarn(log, "TCP discovery SPI is configured in debug 
mode.");
+        }
+
+        // Clear addresses collections.
+        fromAddrs.clear();
+        noResAddrs.clear();
+
+        sockTimeoutWorker = new SocketTimeoutWorker();
+        sockTimeoutWorker.start();
+
+        msgWorker = new RingMessageWorker();
+        msgWorker.start();
+
+        tcpSrvr = new TcpServer();
+
+        // Init local node.
+        IgniteBiTuple<Collection<String>, Collection<String>> addrs;
+
+        try {
+            addrs = U.resolveLocalAddresses(locHost);
+        }
+        catch (IOException | GridException e) {
+            throw new IgniteSpiException("Failed to resolve local host to set 
of external addresses: " + locHost, e);
+        }
+
+        locNode = new TcpDiscoveryNode(
+            locNodeId,
+            addrs.get1(),
+            addrs.get2(),
+            tcpSrvr.port,
+            metricsProvider,
+            locNodeVer);
+
+        try {
+            Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
+                U.resolveAddresses(addrRslvr, 
F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
+                    locNode.discoveryPort());
+
+            if (extAddrs != null)
+                locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), 
extAddrs);
+        }
+        catch (GridException e) {
+            throw new IgniteSpiException("Failed to resolve local host to 
addresses: " + locHost, e);
+        }
+
+        locNode.setAttributes(locNodeAttrs);
+
+        locNode.local(true);
+
+        locNodeAddrs = getNodeAddresses(locNode);
+
+        if (log.isDebugEnabled())
+            log.debug("Local node initialized: " + locNode);
+
+        // Start TCP server thread after local node is initialized.
+        tcpSrvr.start();
+
+        ring.localNode(locNode);
+
+        if (ipFinder.isShared())
+            registerLocalNodeAddress();
+        else {
+            if (F.isEmpty(ipFinder.getRegisteredAddresses()))
+                throw new IgniteSpiException("Non-shared IP finder must have 
IP addresses specified in " +
+                    "GridTcpDiscoveryIpFinder.getRegisteredAddresses() 
configuration property " +
+                    "(specify list of IP addresses in configuration).");
+
+            ipFinderHasLocAddr = ipFinderHasLocalAddress();
+        }
+
+        if (statsPrintFreq > 0 && log.isInfoEnabled()) {
+            statsPrinter = new StatisticsPrinter();
+            statsPrinter.start();
+        }
+
+        stats.onJoinStarted();
+
+        joinTopology();
+
+        stats.onJoinFinished();
+
+        hbsSnd = new HeartbeatsSender();
+        hbsSnd.start();
+
+        chkStatusSnd = new CheckStatusSender();
+        chkStatusSnd.start();
+
+        if (ipFinder.isShared()) {
+            ipFinderCleaner = new IpFinderCleaner();
+            ipFinderCleaner.start();
+        }
+
+        if (log.isDebugEnabled() && !restart)
+            log.debug(startInfo());
+
+        if (restart)
+            getSpiContext().registerPort(tcpSrvr.port, TCP);
+    }
+
+    /**
+     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     */
+    @SuppressWarnings("BusyWait")
+    private void registerLocalNodeAddress() throws IgniteSpiException {
+        // Make sure address registration succeeded.
+        while (true) {
+            try {
+                ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+                // Success.
+                break;
+            }
+            catch (IllegalStateException e) {
+                throw new IgniteSpiException("Failed to register local node 
address with IP finder: " +
+                    locNode.socketAddresses(), e);
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to register local node address in IP 
finder on start " +
+                    "(retrying every 2000 ms).");
+            }
+
+            try {
+                U.sleep(2000);
+            }
+            catch (GridInterruptedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", 
e);
+            }
+        }
+    }
+
+    /**
+     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     */
+    private void onSpiStart() throws IgniteSpiException {
+        startStopwatch();
+
+        assertParameter(ipFinder != null, "ipFinder != null");
+        assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
+        assertParameter(locPort > 1023, "localPort > 1023");
+        assertParameter(locPortRange >= 0, "localPortRange >= 0");
+        assertParameter(locPort + locPortRange <= 0xffff, "locPort + 
locPortRange <= 0xffff");
+        assertParameter(netTimeout > 0, "networkTimeout > 0");
+        assertParameter(sockTimeout > 0, "sockTimeout > 0");
+        assertParameter(ackTimeout > 0, "ackTimeout > 0");
+        assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > 
ackTimeout");
+        assertParameter(reconCnt > 0, "reconnectCnt > 0");
+        assertParameter(hbFreq > 0, "heartbeatFreq > 0");
+        assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
+        assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 
0");
+        assertParameter(threadPri > 0, "threadPri > 0");
+        assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
+
+        try {
+            locHost = U.resolveLocalHost(locAddr);
+        }
+        catch (IOException e) {
+            throw new IgniteSpiException("Unknown local address: " + locAddr, 
e);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(configInfo("localHost", locHost.getHostAddress()));
+            log.debug(configInfo("localPort", locPort));
+            log.debug(configInfo("localPortRange", locPortRange));
+            log.debug(configInfo("threadPri", threadPri));
+            log.debug(configInfo("networkTimeout", netTimeout));
+            log.debug(configInfo("sockTimeout", sockTimeout));
+            log.debug(configInfo("ackTimeout", ackTimeout));
+            log.debug(configInfo("maxAckTimeout", maxAckTimeout));
+            log.debug(configInfo("reconnectCount", reconCnt));
+            log.debug(configInfo("ipFinder", ipFinder));
+            log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
+            log.debug(configInfo("heartbeatFreq", hbFreq));
+            log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
+            log.debug(configInfo("statsPrintFreq", statsPrintFreq));
+        }
+
+        // Warn on odd network timeout.
+        if (netTimeout < 3000)
+            U.warn(log, "Network timeout is too low (at least 3000 ms 
recommended): " + netTimeout);
+
+        // Warn on odd heartbeat frequency.
+        if (hbFreq < 2000)
+            U.warn(log, "Heartbeat frequency is too high (at least 2000 ms 
recommended): " + hbFreq);
+
+        registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
+
+        if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
+            TcpDiscoveryMulticastIpFinder mcastIpFinder = 
((TcpDiscoveryMulticastIpFinder)ipFinder);
+
+            if (mcastIpFinder.getLocalAddress() == null)
+                mcastIpFinder.setLocalAddress(locAddr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
+        super.onContextInitialized0(spiCtx);
+
+        ctxInitLatch.countDown();
+
+        spiCtx.registerPort(tcpSrvr.port, TCP);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSpiContext getSpiContext() {
+        if (ctxInitLatch.getCount() > 0) {
+            if (log.isDebugEnabled())
+                log.debug("Waiting for context initialization.");
+
+            try {
+                U.await(ctxInitLatch);
+
+                if (log.isDebugEnabled())
+                    log.debug("Context has been initialized.");
+            }
+            catch (GridInterruptedException e) {
+                U.warn(log, "Thread has been interrupted while waiting for SPI 
context initialization.", e);
+            }
+        }
+
+        return super.getSpiContext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        spiStop0(false);
+    }
+
+    /**
+     * Stops SPI finally or stops SPI for restart.
+     *
+     * @param disconnect {@code True} if SPI is being disconnected.
+     * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+     */
+    private void spiStop0(boolean disconnect) throws IgniteSpiException {
+        if (ctxInitLatch.getCount() > 0)
+            // Safety.
+            ctxInitLatch.countDown();
+
+        if (log.isDebugEnabled()) {
+            if (disconnect)
+                log.debug("Disconnecting SPI.");
+            else
+                log.debug("Preparing to start local node stop procedure.");
+        }
+
+        if (disconnect) {
+            synchronized (mux) {
+                spiState = DISCONNECTING;
+            }
+        }
+
+        if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
+            // Send node left message only if it is final stop.
+            msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNodeId));
+
+            synchronized (mux) {
+                long threshold = U.currentTimeMillis() + netTimeout;
+
+                long timeout = netTimeout;
+
+                while (spiState != LEFT && timeout > 0) {
+                    try {
+                        mux.wait(timeout);
+
+                        timeout = threshold - U.currentTimeMillis();
+                    }
+                    catch (InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
+
+                        break;
+                    }
+                }
+
+                if (spiState == LEFT) {
+                    if (log.isDebugEnabled())
+                        log.debug("Verification for local node leave has been 
received from coordinator" +
+                            " (continuing stop procedure).");
+                }
+                else if (log.isInfoEnabled()) {
+                    log.info("No verification for local node leave has been 
received from coordinator" +
+                        " (will stop node anyway).");
+                }
+            }
+        }
+
+        U.interrupt(tcpSrvr);
+        U.join(tcpSrvr, log);
+
+        Collection<SocketReader> tmp;
+
+        synchronized (mux) {
+            tmp = U.arrayList(readers);
+        }
+
+        U.interrupt(tmp);
+        U.joinThreads(tmp, log);
+
+        U.interrupt(hbsSnd);
+        U.join(hbsSnd, log);
+
+        U.interrupt(chkStatusSnd);
+        U.join(chkStatusSnd, log);
+
+        U.interrupt(ipFinderCleaner);
+        U.join(ipFinderCleaner, log);
+
+        U.interrupt(msgWorker);
+        U.join(msgWorker, log);
+
+        U.interrupt(sockTimeoutWorker);
+        U.join(sockTimeoutWorker, log);
+
+        U.interrupt(statsPrinter);
+        U.join(statsPrinter, log);
+
+        if (ipFinder != null)
+            ipFinder.close();
+
+        Collection<TcpDiscoveryNode> rmts = null;
+
+        if (!disconnect) {
+            // This is final stop.
+            unregisterMBean();
+
+            if (log.isDebugEnabled())
+                log.debug(stopInfo());
+        }
+        else {
+            getSpiContext().deregisterPorts();
+
+            rmts = ring.visibleRemoteNodes();
+        }
+
+        long topVer = ring.topologyVersion();
+
+        ring.clear();
+
+        if (rmts != null && !rmts.isEmpty()) {
+            // This is restart/disconnection and remote nodes are not empty.
+            // We need to fire FAIL event for each.
+            DiscoverySpiListener lsnr = this.lsnr;
+
+            if (lsnr != null) {
+                Collection<ClusterNode> processed = new LinkedList<>();
+
+                for (TcpDiscoveryNode n : rmts) {
+                    assert n.visible();
+
+                    processed.add(n);
+
+                    Collection<ClusterNode> top = F.viewReadOnly(rmts, 
F.<ClusterNode>identity(), F.notIn(processed));
+
+                    topVer++;
+
+                    Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
+
+                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist);
+                }
+            }
+        }
+
+        printStatistics();
+
+        stats.clear();
+
+        synchronized (mux) {
+            // Clear stored data.
+            leavingNodes.clear();
+            failedNodes.clear();
+
+            spiState = DISCONNECTED;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextDestroyed0() {
+        super.onContextDestroyed0();
+
+        if (ctxInitLatch.getCount() > 0)
+            // Safety.
+            ctxInitLatch.countDown();
+
+        getSpiContext().deregisterPorts();
+    }
+
+    /**
+     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     * @return {@code true} if IP finder contains local address.
+     */
+    private boolean ipFinderHasLocalAddress() throws IgniteSpiException {
+        for (InetSocketAddress locAddr : locNodeAddrs) {
+            for (InetSocketAddress addr : registeredAddresses())
+                try {
+                    int port = addr.getPort();
+
+                    InetSocketAddress resolved = addr.isUnresolved() ?
+                        new 
InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) :
+                        new InetSocketAddress(addr.getAddress(), port);
+
+                    if (resolved.equals(locAddr))
+                        return true;
+                }
+                catch (UnknownHostException ignored) {
+                    // No-op.
+                }
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        assert nodeId != null;
+
+        if (nodeId == locNodeId)
+            return true;
+
+        TcpDiscoveryNode node = ring.node(nodeId);
+
+        if (node == null || !node.visible())
+            return false;
+
+        boolean res = pingNode(node);
+
+        if (!res && !node.isClient()) {
+            LT.warn(log, null, "Failed to ping node (status check will be 
initiated): " + nodeId);
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, 
node.id()));
+        }
+
+        return res;
+    }
+
+    /**
+     * Pings the remote node to see if it's alive.
+     *
+     * @param node Node.
+     * @return {@code True} if ping succeeds.
+     */
+    private boolean pingNode(TcpDiscoveryNode node) {
+        assert node != null;
+
+        if (node.id().equals(locNodeId))
+            return true;
+
+        UUID clientNodeId = null;
+
+        if (node.isClient()) {
+            clientNodeId = node.id();
+
+            node = ring.node(node.clientRouterNodeId());
+
+            if (node == null || !node.visible())
+                return false;
+        }
+
+        for (InetSocketAddress addr : getNodeAddresses(node, 
U.sameMacs(locNode, node))) {
+            try {
+                // ID returned by the node should be the same as ID of the 
parameter for ping to succeed.
+                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+
+                return node.id().equals(t.get1()) && (clientNodeId == null || 
t.get2());
+            }
+            catch (GridException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to ping node [node=" + node + ", err=" + 
e.getMessage() + ']');
+
+                // continue;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Pings the remote node by its address to see if it's alive.
+     *
+     * @param addr Address of the node.
+     * @return ID of the remote node if node alive.
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     */
+    private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, 
@Nullable UUID clientNodeId)
+        throws GridException {
+        assert addr != null;
+
+        if (F.contains(locNodeAddrs, addr))
+            return F.t(locNodeId, false);
+
+        GridFutureAdapterEx<IgniteBiTuple<UUID, Boolean>> fut = new 
GridFutureAdapterEx<>();
+
+        IgniteFuture<IgniteBiTuple<UUID, Boolean>> oldFut = 
pingMap.putIfAbsent(addr, fut);
+
+        if (oldFut != null)
+            return oldFut.get();
+        else {
+            Collection<Throwable> errs = null;
+
+            try {
+                Socket sock = null;
+
+                for (int i = 0; i < reconCnt; i++) {
+                    try {
+                        if (addr.isUnresolved())
+                            addr = new 
InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+
+                        long tstamp = U.currentTimeMillis();
+
+                        sock = openSocket(addr);
+
+                        writeToSocket(sock, new 
TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+
+                        TcpDiscoveryPingResponse res = readMessage(sock, null, 
netTimeout);
+
+                        if (locNodeId.equals(res.creatorNodeId())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Ping response from local node: " + 
res);
+
+                            break;
+                        }
+
+                        stats.onClientSocketInitialized(U.currentTimeMillis() 
- tstamp);
+
+                        IgniteBiTuple<UUID, Boolean> t = 
F.t(res.creatorNodeId(), res.clientExists());
+
+                        fut.onDone(t);
+
+                        return t;
+                    }
+                    catch (IOException | GridException e) {
+                        if (errs == null)
+                            errs = new ArrayList<>();
+
+                        errs.add(e);
+                    }
+                    finally {
+                        U.closeQuiet(sock);
+                    }
+                }
+            }
+            catch (Throwable t) {
+                fut.onDone(t);
+
+                throw U.cast(t);
+            }
+            finally {
+                if (!fut.isDone())
+                    fut.onDone(U.exceptionWithSuppressed("Failed to ping node 
by address: " + addr, errs));
+
+                boolean b = pingMap.remove(addr, fut);
+
+                assert b;
+            }
+
+            return fut.get();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        spiStop0(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator 
nodeAuth) {
+        this.nodeAuth = nodeAuth;
+    }
+
+    /**
+     * Tries to join this node to topology.
+     *
+     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     */
+    private void joinTopology() throws IgniteSpiException {
+        synchronized (mux) {
+            assert spiState == CONNECTING || spiState == DISCONNECTED;
+
+            spiState = CONNECTING;
+        }
+
+        GridSecurityCredentials locCred = 
(GridSecurityCredentials)locNode.getAttributes()
+            .get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+        // Marshal credentials for backward compatibility and security.
+        marshalCredentials(locNode);
+
+        while (true) {
+            if (!sendJoinRequestMessage()) {
+                if (log.isDebugEnabled())
+                    log.debug("Join request message has not been sent (local 
node is the first in the topology).");
+
+                // Authenticate local node.
+                try {
+                    GridSecurityContext subj = 
nodeAuth.authenticateNode(locNode, locCred);
+
+                    if (subj == null)
+                        throw new IgniteSpiException("Authentication failed 
for local node: " + locNode.id());
+
+                    Map<String, Object> attrs = new 
HashMap<>(locNode.attributes());
+
+                    attrs.put(GridNodeAttributes.ATTR_SECURITY_SUBJECT, 
gridMarsh.marshal(subj));
+                    attrs.remove(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+                    locNode.setAttributes(attrs);
+                }
+                catch (GridException e) {
+                    throw new IgniteSpiException("Failed to authenticate local 
node (will shutdown local node).", e);
+                }
+
+                locNode.order(1);
+                locNode.internalOrder(1);
+
+                gridStartTime = U.currentTimeMillis();
+
+                locNode.visible(true);
+
+                ring.clear();
+
+                ring.topologyVersion(1);
+
+                synchronized (mux) {
+                    topHist.clear();
+
+                    spiState = CONNECTED;
+
+                    mux.notifyAll();
+                }
+
+                notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
+
+                break;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Join request message has been sent (waiting for 
coordinator response).");
+
+            synchronized (mux) {
+                long threshold = U.currentTimeMillis() + netTimeout;
+
+                long timeout = netTimeout;
+
+                while (spiState == CONNECTING && timeout > 0) {
+                    try {
+                        mux.wait(timeout);
+
+                        timeout = threshold - U.currentTimeMillis();
+                    }
+                    catch (InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
+
+                        throw new IgniteSpiException("Thread has been 
interrupted.");
+                    }
+                }
+
+                if (spiState == CONNECTED)
+                    break;
+                else if (spiState == DUPLICATE_ID)
+                    throw 
duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
+                else if (spiState == AUTH_FAILED)
+                    throw 
authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
+                else if (spiState == CHECK_FAILED)
+                    throw 
checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
+                else if (spiState == LOOPBACK_PROBLEM) {
+                    TcpDiscoveryLoopbackProblemMessage msg = 
(TcpDiscoveryLoopbackProblemMessage)joinRes.get();
+
+                    boolean locHostLoopback = locHost.isLoopbackAddress();
+
+                    String firstNode = locHostLoopback ? "local" : "remote";
+
+                    String secondNode = locHostLoopback ? "remote" : "local";
+
+                    throw new IgniteSpiException("Failed to add node to 
topology because " + firstNode +
+                        " node is configured to use loopback address, but " + 
secondNode + " node is not " +
+                        "(consider changing 'localAddress' configuration 
parameter) " +
+                        "[locNodeAddrs=" + U.addressesAsString(locNode) + ", 
rmtNodeAddrs=" +
+                        U.addressesAsString(msg.addresses(), msg.hostNames()) 
+ ']');
+                }
+                else
+                    LT.warn(log, null, "Node has not been connected to 
topology and will repeat join process. " +
+                        "Check remote nodes logs for possible error messages. 
" +
+                        "Note that large topology may require significant time 
to start. " +
+                        "Increase 'GridTcpDiscoverySpi.networkTimeout' 
configuration property " +
+                        "if getting this message on the starting nodes 
[networkTimeout=" + netTimeout + ']');
+            }
+        }
+
+        assert locNode.order() != 0;
+        assert locNode.internalOrder() != 0;
+
+        if (log.isDebugEnabled())
+            log.debug("Discovery SPI has been connected to topology with 
order: " + locNode.internalOrder());
+    }
+
+    /**
+     * @param msg Error message.
+     * @return Remote grid version parsed from error message.
+     * @deprecated This method was created for preserving backward 
compatibility. During major version update
+     *      parsing of error message should be replaced with new {@link 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage}
+     *      which contains all necessary information.
+     */
+    @Deprecated
+    @Nullable private String parseRemoteVersion(String msg) {
+        msg = msg.replaceAll("\\s", "");
+
+        final String verPrefix = "rmtBuildVer=";
+
+        int startIdx = msg.indexOf(verPrefix);
+        int endIdx = msg.indexOf(',', startIdx);
+
+        if (endIdx < 0)
+            endIdx = msg.indexOf(']', startIdx);
+
+        if (startIdx < 0 || endIdx < 0)
+            return null;
+
+        return msg.substring(startIdx + verPrefix.length() - 1, endIdx);
+    }
+
+    /**
+     * Tries to send join request message to a random node presenting in 
topology.
+     * Address is provided by {@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message 
is
+     * sent to first node connection succeeded to.
+     *
+     * @return {@code true} if send succeeded.
+     * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private boolean sendJoinRequestMessage() throws IgniteSpiException {
+        TcpDiscoveryAbstractMessage joinReq = new 
TcpDiscoveryJoinRequestMessage(locNode,
+            exchange.collect(locNodeId));
+
+        // Time when it has been detected, that addresses from IP finder do 
not respond.
+        long noResStart = 0;
+
+        while (true) {
+            Collection<InetSocketAddress> addrs = resolvedAddresses();
+
+            if (F.isEmpty(addrs))
+                return false;
+
+            boolean retry = false;
+            GridException errs = null;
+
+            for (InetSocketAddress addr : addrs) {
+                try {
+                    Integer res = sendMessageDirectly(joinReq, addr);
+
+                    assert res != null;
+
+                    noResAddrs.remove(addr);
+
+                    // Address is responsive, reset period start.
+                    noResStart = 0;
+
+                    switch (res) {
+                        case RES_WAIT:
+                            // Concurrent startup, try sending join request 
again or wait if no success.
+                            retry = true;
+
+                            break;
+                        case RES_OK:
+                            if (log.isDebugEnabled())
+                                log.debug("Join request message has been sent 
to address [addr=" + addr +
+                                    ", req=" + joinReq + ']');
+
+                            // Join request sending succeeded, wait for 
response from topology.
+                            return true;
+
+                        default:
+                            // Concurrent startup, try next node.
+                            if (res == RES_CONTINUE_JOIN) {
+                                if (!fromAddrs.contains(addr))
+                                    retry = true;
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Unexpected response to join 
request: " + res);
+
+                                retry = true;
+                            }
+
+                            break;
+                    }
+                }
+                catch (IgniteSpiException e) {
+                    if (errs == null)
+                        errs = new GridException("Multiple connection attempts 
failed.");
+
+                    errs.addSuppressed(e);
+
+                    if (log.isDebugEnabled()) {
+                        IOException ioe = X.cause(e, IOException.class);
+
+                        log.debug("Failed to send join request message [addr=" 
+ addr +
+                            ", msg=" + ioe != null ? ioe.getMessage() : 
e.getMessage() + ']');
+                    }
+
+                    noResAddrs.add(addr);
+                }
+            }
+
+            if (retry) {
+                if (log.isDebugEnabled())
+                    log.debug("Concurrent discovery SPI start has been 
detected (local node should wait).");
+
+                try {
+                    U.sleep(2000);
+                }
+                catch (GridInterruptedException e) {
+                    throw new IgniteSpiException("Thread has been 
interrupted.", e);
+                }
+            }
+            else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
+                if (errs != null && X.hasCause(errs, ConnectException.class))
+                    LT.warn(log, null, "Failed to connect to any address from 
IP finder " +
+                        "(make sure IP finder addresses are correct and 
firewalls are disabled on all host machines): " +
+                        addrs);
+
+                if (joinTimeout > 0) {
+                    if (noResStart == 0)
+                        noResStart = U.currentTimeMillis();
+                    else if (U.currentTimeMillis() - noResStart > joinTimeout)
+                        throw new IgniteSpiException(
+                            "Failed to connect to any address from IP finder 
within join timeout " +
+                                "(make sure IP finder addresses are correct, 
and operating system firewalls are disabled " +
+                                "on all host machines, or consider increasing 
'joinTimeout' configuration property): " +
+                                addrs, errs);
+                }
+
+                try {
+                    U.sleep(2000);
+                }
+                catch (GridInterruptedException e) {
+                    throw new IgniteSpiException("Thread has been 
interrupted.", e);
+                }
+            }
+            else
+                break;
+        }
+
+        return false;
+    }
+
+    /**
+     * Establishes connection to an address, sends message and returns the 
response (if any).
+     *
+     * @param msg Message to send.
+     * @param addr Address to send message to.
+     * @return Response read from the recipient or {@code null} if no response 
is supposed.
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     */
+    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage 
msg, InetSocketAddress addr)
+        throws IgniteSpiException {
+        assert msg != null;
+        assert addr != null;
+
+        Collection<Throwable> errs = null;
+
+        Socket sock = null;
+
+        long ackTimeout0 = ackTimeout;
+
+        int connectAttempts = 1;
+
+        boolean joinReqSent = false;
+
+        for (int i = 0; i < reconCnt; i++) {
+            // Need to set to false on each new iteration,
+            // since remote node may leave in the middle of the first 
iteration.
+            joinReqSent = false;
+
+            boolean openSock = false;
+
+            try {
+                long tstamp = U.currentTimeMillis();
+
+                sock = openSocket(addr);
+
+                openSock = true;
+
+                // Handshake.
+                writeToSocket(sock, new 
TcpDiscoveryHandshakeRequest(locNodeId));
+
+                TcpDiscoveryHandshakeResponse res = readMessage(sock, null, 
ackTimeout0);
+
+                if (locNodeId.equals(res.creatorNodeId())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake response from local node: " + 
res);
+
+                    break;
+                }
+
+                stats.onClientSocketInitialized(U.currentTimeMillis() - 
tstamp);
+
+                // Send message.
+                tstamp = U.currentTimeMillis();
+
+                writeToSocket(sock, msg);
+
+                stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+                if (debugMode)
+                    debugLog("Message has been sent directly to address [msg=" 
+ msg + ", addr=" + addr +
+                        ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+                if (log.isDebugEnabled())
+                    log.debug("Message has been sent directly to address 
[msg=" + msg + ", addr=" + addr +
+                        ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+                // Connection has been established, but
+                // join request may not be unmarshalled on remote host.
+                // E.g. due to class not found issue.
+                joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+
+                return readReceipt(sock, ackTimeout0);
+            }
+            catch (ClassCastException e) {
+                // This issue is rarely reproducible on AmazonEC2, but never
+                // on dedicated machines.
+                if (log.isDebugEnabled())
+                    U.error(log, "Class cast exception on direct send: " + 
addr, e);
+
+                if (errs == null)
+                    errs = new ArrayList<>();
+
+                errs.add(e);
+            }
+            catch (IOException | GridException e) {
+                if (log.isDebugEnabled())
+                    log.error("Exception on direct send: " + e.getMessage(), 
e);
+
+                if (errs == null)
+                    errs = new ArrayList<>();
+
+                errs.add(e);
+
+                if (!openSock) {
+                    // Reconnect for the second time, if connection is not 
established.
+                    if (connectAttempts < 2) {
+                        connectAttempts++;
+
+                        continue;
+                    }
+
+                    break; // Don't retry if we can not establish connection.
+                }
+
+                if (e instanceof SocketTimeoutException || X.hasCause(e, 
SocketTimeoutException.class)) {
+                    ackTimeout0 *= 2;
+
+                    if (!checkAckTimeout(ackTimeout0))
+                        break;
+                }
+            }
+            finally {
+                U.closeQuiet(sock);
+            }
+        }
+
+        if (joinReqSent) {
+            if (log.isDebugEnabled())
+                log.debug("Join request has been sent, but receipt has not 
been read (returning RES_WAIT).");
+
+            // Topology will not include this node,
+            // however, warning on timed out join will be output.
+            return RES_OK;
+        }
+
+        throw new IgniteSpiException(
+            "Failed to send message to address [addr=" + addr + ", msg=" + msg 
+ ']',
+            U.exceptionWithSuppressed("Failed to send message to address " +
+                "[addr=" + addr + ", msg=" + msg + ']', errs));
+    }
+
+    /**
+     * Marshalls credentials with discovery SPI marshaller (will replace 
attribute value).
+     *
+     * @param node Node to marshall credentials for.
+     * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed.
+     */
+    private void marshalCredentials(TcpDiscoveryNode node) throws 
IgniteSpiException {
+        try {
+            // Use security-unsafe getter.
+            Map<String, Object> attrs = new HashMap<>(node.getAttributes());
+
+            attrs.put(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS,
+                
marsh.marshal(attrs.get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+
+            node.setAttributes(attrs);
+        }
+        catch (GridException e) {
+            throw new IgniteSpiException("Failed to marshal node security 
credentials: " + node.id(), e);
+        }
+    }
+
+    /**
+     * Unmarshalls credentials with discovery SPI marshaller (will not replace 
attribute value).
+     *
+     * @param node Node to unmarshall credentials for.
+     * @return Security credentials.
+     * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails.
+     */
+    private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode 
node) throws IgniteSpiException {
+        try {
+            byte[] credBytes = 
(byte[])node.getAttributes().get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+            if (credBytes == null)
+                return null;
+
+            return marsh.unmarshal(credBytes, null);
+        }
+        catch (GridException e) {
+            throw new IgniteSpiException("Failed to unmarshal node security 
credentials: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param ackTimeout Acknowledgement timeout.
+     * @return {@code True} if acknowledgement timeout is less or equal to
+     * maximum acknowledgement timeout, {@code false} otherwise.
+     */
+    private boolean checkAckTimeout(long ackTimeout) {
+        if (ackTimeout > maxAckTimeout) {
+            LT.warn(log, null, "Acknowledgement timeout is greater than 
maximum acknowledgement timeout " +
+                "(consider increasing 'maxAckTimeout' configuration property) 
" +
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + 
maxAckTimeout + ']');
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Notify external listener on discovery event.
+     *
+     * @param type Discovery event type. See {@link 
org.apache.ignite.events.IgniteDiscoveryEvent} for more details.
+     * @param topVer Topology version.
+     * @param node Remote node this event is connected with.
+     */
+    private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) 
{
+        assert type > 0;
+        assert node != null;
+
+        DiscoverySpiListener lsnr = this.lsnr;
+
+        TcpDiscoverySpiState spiState = spiStateCopy();
+
+        if (lsnr != null && node.visible() && (spiState == CONNECTED || 
spiState == DISCONNECTING)) {
+            if (log.isDebugEnabled())
+                log.debug("Discovery notification [node=" + node + ", 
spiState=" + spiState +
+                    ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
+
+            Collection<ClusterNode> top = F.<TcpDiscoveryNode, 
ClusterNode>upcast(ring.visibleNodes());
+
+            Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
+
+            lsnr.onDiscovery(type, topVer, node, top, hist);
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Skipped discovery notification [node=" + node + ", 
spiState=" + spiState +
+                ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
+    }
+
+    /**
+     * Update topology history with new topology snapshots.
+     *
+     * @param topVer Topology version.
+     * @param top Topology snapshot.
+     * @return Copy of updated topology history.
+     */
+    @Nullable private Map<Long, Collection<ClusterNode>> 
updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
+        synchronized (mux) {
+            if (topHist.containsKey(topVer))
+                return null;
+
+            topHist.put(topVer, top);
+
+            while (topHist.size() > topHistSize)
+                topHist.remove(topHist.firstKey());
+
+            if (log.isDebugEnabled())
+                log.debug("Added topology snapshot to history, topVer=" + 
topVer + ", historySize=" + topHist.size());
+
+            return new TreeMap<>(topHist);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @return {@link LinkedHashSet} of internal and external addresses of 
provided node.
+     *      Internal addresses placed before external addresses.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode 
node) {
+        LinkedHashSet<InetSocketAddress> res = new 
LinkedHashSet<>(node.socketAddresses());
+
+        Collection<InetSocketAddress> extAddrs = 
node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
+
+        if (extAddrs != null)
+            res.addAll(extAddrs);
+
+        return res;
+    }
+
+    /**
+     * @param node Node.
+     * @param sameHost Same host flag.
+     * @return {@link LinkedHashSet} of internal and external addresses of 
provided node.
+     *      Internal addresses placed before external addresses.
+     *      Internal addresses will be sorted with {@code 
inetAddressesComparator(sameHost)}.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode 
node, boolean sameHost) {
+        List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses());
+
+        Collections.sort(addrs, U.inetAddressesComparator(sameHost));
+
+        LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs);
+
+        Collection<InetSocketAddress> extAddrs = 
node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
+
+        if (extAddrs != null)
+            res.addAll(extAddrs);
+
+        return res;
+    }
+
+    /**
+     * Checks whether local node is coordinator. Nodes that are leaving or 
failed
+     * (but are still in topology) are removed from search.
+     *
+     * @return {@code true} if local node is coordinator.
+     */
+    private boolean isLocalNodeCoordinator() {
+        synchronized (mux) {
+            boolean crd = spiState == CONNECTED && 
locNode.equals(resolveCoordinator());
+
+            if (crd)
+                stats.onBecomingCoordinator();
+
+            return crd;
+        }
+    }
+
+    /**
+     * @return Spi state copy.
+     */
+    private TcpDiscoverySpiState spiStateCopy() {
+        TcpDiscoverySpiState state;
+
+        synchronized (mux) {
+            state = spiState;
+        }
+
+        return state;
+    }
+
+    /**
+     * Resolves coordinator. Nodes that are leaving or failed (but are still in
+     * topology) are removed from search.
+     *
+     * @return Coordinator node or {@code null} if there are no coordinator
+     * (i.e. local node is the last one and is currently stopping).
+     */
+    @Nullable private TcpDiscoveryNode resolveCoordinator() {
+        return resolveCoordinator(null);
+    }
+
+    /**
+     * Resolves coordinator. Nodes that are leaving or failed (but are still in
+     * topology) are removed from search as well as provided filter.
+     *
+     * @param filter Nodes to exclude when resolving coordinator (optional).
+     * @return Coordinator node or {@code null} if there are no coordinator
+     * (i.e. local node is the last one and is currently stopping).
+     */
+    @Nullable private TcpDiscoveryNode resolveCoordinator(
+        @Nullable Collection<TcpDiscoveryNode> filter) {
+        synchronized (mux) {
+            Collection<TcpDiscoveryNode> excluded = F.concat(false, 
failedNodes, leavingNodes);
+
+            if (!F.isEmpty(filter))
+                excluded = F.concat(false, excluded, filter);
+
+            return ring.coordinator(excluded);
+        }
+    }
+
+    /**
+     * Prints SPI statistics.
+     */
+    private void printStatistics() {
+        if (log.isInfoEnabled() && statsPrintFreq > 0) {
+            int failedNodesSize;
+            int leavingNodesSize;
+
+            synchronized (mux) {
+                failedNodesSize = failedNodes.size();
+                leavingNodesSize = leavingNodes.size();
+            }
+
+            Runtime runtime = Runtime.getRuntime();
+
+            TcpDiscoveryNode coord = resolveCoordinator();
+
+            log.info("Discovery SPI statistics [statistics=" + stats + ", 
spiState=" + spiStateCopy() +
+                ", coord=" + coord +
+                ", topSize=" + ring.allNodes().size() +
+                ", leavingNodesSize=" + leavingNodesSize + ", 
failedNodesSize=" + failedNodesSize +
+                ", msgWorker.queue.size=" + (msgWorker != null ? 
msgWorker.queueSize() : "N/A") +
+                ", lastUpdate=" + (locNode != null ? 
U.format(locNode.lastUpdateTime()) : "N/A") +
+                ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
+                "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
+        }
+    }
+
+    /**
+     * @param msg Message to prepare.
+     * @param destNodeId Destination node ID.
+     * @param msgs Messages to include.
+     * @param discardMsgId Discarded message ID.
+     */
+    private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID 
destNodeId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable 
IgniteUuid discardMsgId) {
+        assert destNodeId != null;
+
+        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+            TcpDiscoveryNodeAddedMessage nodeAddedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+
+            TcpDiscoveryNode node = nodeAddedMsg.node();
+
+            if (node.id().equals(destNodeId)) {
+                Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+                Collection<TcpDiscoveryNode> topToSend = new 
ArrayList<>(allNodes.size());
+
+                for (TcpDiscoveryNode n0 : allNodes) {
+                    assert n0.internalOrder() != 0 : n0;
+
+                    // Skip next node and nodes added after next
+                    // in case this message is resent due to failures/leaves.
+                    // There will be separate messages for nodes with greater
+                    // internal order.
+                    if (n0.internalOrder() < 
nodeAddedMsg.node().internalOrder())
+                        topToSend.add(n0);
+                }
+
+                nodeAddedMsg.topology(topToSend);
+                nodeAddedMsg.messages(msgs, discardMsgId);
+
+                Map<Long, Collection<ClusterNode>> hist;
+
+                synchronized (mux) {
+                    hist = new TreeMap<>(topHist);
+                }
+
+                nodeAddedMsg.topologyHistory(hist);
+            }
+        }
+    }
+
+    /**
+     * @param msg Message to clear.
+     */
+    private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
+        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+            // Nullify topology before registration.
+            TcpDiscoveryNodeAddedMessage nodeAddedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+
+            nodeAddedMsg.topology(null);
+            nodeAddedMsg.topologyHistory(null);
+            nodeAddedMsg.messages(null, null);
+        }
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * Simulates this node failure by stopping service threads. So, node will 
become
+     * unresponsive.
+     * <p>
+     * This method is intended for test purposes only.
+     */
+    void simulateNodeFailure() {
+        U.warn(log, "Simulating node failure: " + locNodeId);
+
+        U.interrupt(tcpSrvr);
+        U.join(tcpSrvr, log);
+
+        U.interrupt(hbsSnd);
+        U.join(hbsSnd, log);
+
+        U.interrupt(chkStatusSnd);
+        U.join(chkStatusSnd, log);
+
+        U.interrupt(ipFinderCleaner);
+        U.join(ipFinderCleaner, log);
+
+        Collection<SocketReader> tmp;
+
+        synchronized (mux) {
+            tmp = U.arrayList(readers);
+        }
+
+        U.interrupt(tmp);
+        U.joinThreads(tmp, log);
+
+        U.interrupt(msgWorker);
+        U.join(msgWorker, log);
+
+        U.interrupt(statsPrinter);
+        U.join(statsPrinter, log);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * Simulates situation when next node is still alive but is bypassed
+     * since it has been excluded from the ring, possibly, due to short time
+     * network problems.
+     * <p>
+     * This method is intended for test purposes only.
+     */
+    void forceNextNodeFailure() {
+        U.warn(log, "Next node will be forcibly failed (if any).");
+
+        TcpDiscoveryNode next;
+
+        synchronized (mux) {
+            next = ring.nextNode(failedNodes);
+        }
+
+        if (next != null)
+            msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, 
next.id(), next.internalOrder()));
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * This method is intended for test purposes only.
+     *
+     * @param msg Message.
+     */
+    void onBeforeMessageSentAcrossRing(Serializable msg) {
+        // No-op.
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * This method is intended for test purposes only.
+     *
+     * @return Nodes ring.
+     */
+    TcpDiscoveryNodesRing ring() {
+        return ring;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dumpDebugInfo() {
+        dumpDebugInfo(log);
+    }
+
+    /**
+     * @param log Logger.
+     */
+    public void dumpDebugInfo(IgniteLogger log) {
+        if (!debugMode) {
+            U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was 
not configured " +
+                "in debug mode, consider setting 'debugMode' configuration 
property to 'true').");
+
+            return;
+        }
+
+        assert log.isInfoEnabled();
+
+        synchronized (mux) {
+            StringBuilder b = new StringBuilder(U.nl());
+
+            b.append(">>>").append(U.nl());
+            b.append(">>>").append("Dumping discovery SPI debug 
info.").append(U.nl());
+            b.append(">>>").append(U.nl());
+
+            b.append("Local node ID: 
").append(locNodeId).append(U.nl()).append(U.nl());
+            b.append("Local node: 
").append(locNode).append(U.nl()).append(U.nl());
+            b.append("SPI state: 
").append(spiState).append(U.nl()).append(U.nl());
+
+            b.append("Internal threads: ").append(U.nl());
+
+            b.append("    Message worker: 
").append(threadStatus(msgWorker)).append(U.nl());
+            b.append("    Check status sender: 
").append(threadStatus(chkStatusSnd)).append(U.nl());
+            b.append("    HB sender: 
").append(threadStatus(hbsSnd)).append(U.nl());
+            b.append("    Socket timeout worker: 
").append(threadStatus(sockTimeoutWorker)).append(U.nl());
+            b.append("    IP finder cleaner: 
").append(threadStatus(ipFinderCleaner)).append(U.nl());
+            b.append("    Stats printer: 
").append(threadStatus(statsPrinter)).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Socket readers: ").append(U.nl());
+
+            for (SocketReader rdr : readers)
+                b.append("    ").append(rdr).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("In-memory log messages: ").append(U.nl());
+
+            for (String msg : debugLog)
+                b.append("    ").append(msg).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Leaving nodes: ").append(U.nl());
+
+            for (TcpDiscoveryNode node : leavingNodes)
+                b.append("    ").append(node.id()).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Failed nodes: ").append(U.nl());
+
+            for (TcpDiscoveryNode node : failedNodes)
+                b.append("    ").append(node.id()).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Stats: ").append(stats).append(U.nl());
+
+            U.quietAndInfo(log, b.toString());
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void debugLog(String msg) {
+        assert debugMode;
+
+        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new 
Date(System.currentTimeMillis())) +
+            '[' + Thread.currentThread().getName() + "][" + locNodeId + "-" + 
locNode.internalOrder() + "] " +
+            msg;
+
+        debugLog.add(msg0);
+
+        int delta = debugLog.size() - debugMsgHist;
+
+        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
+            debugLog.poll();
+    }
+
+    /**
+     * @param msg Message.
+     * @return {@code True} if recordable in debug mode.
+     */
+    private boolean recordable(TcpDiscoveryAbstractMessage msg) {
+        return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
+            !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
+            !(msg instanceof TcpDiscoveryDiscardMessage);
+    }
+
+    /**
+     * @param t Thread.
+     * @return Status as string.
+     */
+    private String threadStatus(Thread t) {
+        if (t == null)
+            return "N/A";
+
+        return t.isAlive() ? "alive" : "dead";
+    }
+
+    /**
+     * Checks if two given {@link GridSecurityPermissionSet} objects contain 
the same permissions.
+     * Each permission belongs to one of three groups : cache, task or system.
+     *
+     * @param locPerms The first set of permissions.
+     * @param rmtPerms The second set of permissions.
+     * @return {@code True} if given parameters contain the same permissions, 
{@code False} otherwise.
+     */
+    private boolean permissionsEqual(GridSecurityPermissionSet locPerms, 
GridSecurityPermissionSet rmtPerms) {
+        boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ 
rmtPerms.defaultAllowAll());
+
+        boolean bothHaveSamePerms = 
F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) &&
+            F.eqNotOrdered(rmtPerms.cachePermissions(), 
locPerms.cachePermissions()) &&
+            F.eqNotOrdered(rmtPerms.taskPermissions(), 
locPerms.taskPermissions());
+
+        return dfltAllowMatch && bothHaveSamePerms;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoverySpi.class, this);
+    }
+
+    /**
+     * Thread that sends heartbeats.
+     */
+    private class HeartbeatsSender extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private HeartbeatsSender() {
+            super(gridName, "tcp-disco-hb-sender", log);
+
+            setPriority(threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            while (!isLocalNodeCoordinator())
+                Thread.sleep(1000);
+
+            if (log.isDebugEnabled())
+                log.debug("Heartbeats sender has been started.");
+
+            while (!isInterrupted()) {
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping heartbeats sender (SPI is not 
connected to topology).");
+
+                    return;
+                }
+
+                TcpDiscoveryHeartbeatMessage msg = new 
TcpDiscoveryHeartbeatMessage(locNodeId);
+
+                msg.verify(locNodeId);
+
+                msgWorker.addMessage(msg);
+
+                Thread.sleep(hbFreq);
+            }
+        }
+    }
+
+    /**
+     * Thread that sends status check messages to next node if local node has 
not
+     * been receiving heartbeats ({@link 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage})
+     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
+     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
+     */
+    private class CheckStatusSender extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private CheckStatusSender() {
+            super(gridName, "tcp-disco-status-check-sender", log);
+
+            setPriority(threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Status check sender has been started.");
+
+            // Only 1 heartbeat missing is acceptable. 1 sec is added to avoid 
false alarm.
+            long checkTimeout = (long)maxMissedHbs * hbFreq + 1000;
+
+            long lastSent = 0;
+
+            while (!isInterrupted()) {
+                // 1. Determine timeout.
+                if (lastSent < locNode.lastUpdateTime())
+                    lastSent = locNode.lastUpdateTime();
+
+                long timeout = (lastSent + checkTimeout) - 
U.currentTimeMillis();
+
+                if (timeout > 0)
+                    Thread.sleep(timeout);
+
+                // 2. Check if SPI is still connected.
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping status check sender (SPI is not 
connected to topology).");
+
+                    return;
+                }
+
+                // 3. Was there an update?
+                if (locNode.lastUpdateTime() > lastSent || 
!ring.hasRemoteNodes()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping status check send " +
+                            "[locNodeLastUpdate=" + 
U.format(locNode.lastUpdateTime()) +
+                            ", hasRmts=" + ring.hasRemoteNodes() + ']');
+
+                    continue;
+                }
+
+                // 4. Send status check message.
+                lastSent = U.currentTimeMillis();
+
+                msgWorker.addMessage(new 
TcpDiscoveryStatusCheckMessage(locNode, null));
+            }
+        }
+    }
+
+    /**
+     * Thread that cleans IP finder and keeps it in the correct state, 
unregistering
+     * addresses of the nodes that has left the topology.
+     * <p>
+     * This thread should run only on coordinator node and will clean IP finder
+     * if and only if {@link 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} 
is {@code true}.
+     */
+    private class IpFinderCleaner extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private IpFinderCleaner() {
+            super(gridName, "tcp-disco-ip-finder-cleaner", log);
+
+            setPriority(threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("IP finder cleaner has been started.");
+
+            while (!isInterrupted()) {
+                Thread.sleep(ipFinderCleanFreq);
+
+                if (!isLocalNodeCoordinator())
+                    continue;
+
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping IP finder cleaner (SPI is not 
connected to topology).");
+
+                    return;
+                }
+
+                if (ipFinder.isShared())
+                    cleanIpFinder();
+            }
+        }
+
+        /**
+         * Cleans IP finder.
+         */
+        private void cleanIpFinder() {
+            assert ipFinder.isShared();
+
+            try {
+                // Addresses that belongs to nodes in topology.
+                Collection<InetSocketAddress> currAddrs = F.flatCollections(
+                    F.viewReadOnly(
+                        ring.allNodes(),
+                        new C1<TcpDiscoveryNode, 
Collection<InetSocketAddress>>() {
+                            @Override public Collection<InetSocketAddress> 
apply(TcpDiscoveryNode node) {
+                                return !node.isClient() ? 
getNodeAddresses(node) :
+                                    Collections.<InetSocketAddress>emptyList();
+                            }
+                        }
+                    )
+                );
+
+                // Addresses registered in IP finder.
+                Collection<InetSocketAddress> regAddrs = registeredAddresses();
+
+                // Remove all addresses that belong to alive nodes, leave 
dead-node addresses.
+                Collection<InetSocketAddress> rmvAddrs = F.view(
+                    regAddrs,
+                    F.notContains(currAddrs),
+                    new P1<InetSocketAddress>() {
+                        private final Map<InetSocketAddress, Boolean> 
pingResMap =
+                            new HashMap<>();
+
+                        @Override public boolean apply(InetSocketAddress addr) 
{
+                            Boolean res = pingResMap.get(addr);
+
+                            if (res == null) {
+                                try {
+                                    res = pingNode(addr, null).get1() != null;
+                                }
+                                catch (GridException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to ping node [addr=" 
+ addr +
+                                            ", err=" + e.getMessage() + ']');
+
+                                    res = false;
+                                }
+                                finally {
+                                    pingResMap.put(addr, res);
+                                }
+                            }
+
+                            return !res;
+                        }
+                    }
+                );
+
+                // Unregister dead-nodes addresses.
+                if (!rmvAddrs.isEmpty()) {
+                    ipFinder.unregisterAddresses(rmvAddrs);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Unregistered addresses from IP finder: " + 
rmvAddrs);
+                }
+
+                // Addresses that were removed by mistake (e.g. on 
segmentation).
+                Collection<InetSocketAddress> missingAddrs = F.view(
+                    currAddrs,
+                    F.notContains(regAddrs)
+                );
+
+                // Re-register missing addresses.
+                if (!missingAddrs.isEmpty()) {
+                    ipFinder.registerAddresses(missingAddrs);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Registered missing addresses in IP finder: 
" + missingAddrs);
+                }
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to clean IP finder up.");
+            }
+        }
+    }
+
+    /**
+     * Pending messages container.
+     */
+    private static class PendingMessages {
+        /** */
+        private static final int MAX = 1024;
+
+        /** Pending messages. */
+        private final Queue<TcpDiscoveryAbstractMessage> msgs = new 
ArrayDeque<>(MAX * 2);
+
+        /** Discarded message ID. */
+        private IgniteUuid discardId;
+
+        /**
+         * Adds pending message and shrinks queue if it exceeds limit
+         * (messages that were not discarded yet are never removed).
+         *
+         * @param msg Message to add.
+         */
+        void add(TcpDiscoveryAbstractMessage msg) {
+            msgs.add(msg);
+
+            while (msgs.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs.poll();
+
+                assert polled != null;
+
+                if (polled.id().equals(discardId))
+                    break;
+            }
+        }
+
+        /**
+         * Gets messages starting from provided ID (exclusive). If such
+         * message is not found, {@code null} is returned (this indicates
+         * a failure condition when it was already removed from queue).
+         *
+         * @param lastMsgId Last message ID.
+         * @return Collection of messages.
+         */
+        @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid 
lastMsgId) {
+            assert lastMsgId != null;
+
+            Collection<TcpDiscoveryAbstractMessage> copy = new 
ArrayList<>(msgs.size());
+
+            boolean skip = true;
+
+            for (TcpDiscoveryAbstractMessage msg : msgs) {
+                if (skip) {
+                    if (msg.id().equals(lastMsgId))
+                        skip = false;
+                }
+                else
+                    copy.add(msg);
+            }
+
+            return !skip ? copy : null;
+        }
+
+        /**
+         * Resets pending messages.
+         *
+         * @param msgs Message.
+         * @param discardId Discarded message ID.
+         */
+        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, 
@Nullable IgniteUuid discardId) {
+            this.msgs.clear();
+
+            if (msgs != null)
+                this.msgs.addAll(msgs);
+
+            this.discardId = discardId;
+        }
+
+        /**
+         * Clears pending messages.
+         */
+        void clear() {
+            msgs.clear();
+
+            discardId = null;
+        }
+
+        /**
+         * Discards message with provided ID and all before it.
+         *
+         * @param id Discarded message ID.
+         */
+        void discard(IgniteUuid id) {
+            discardId = id;
+        }
+    }
+
+    /**
+     * Message worker thread for messages

<TRUNCATED>

Reply via email to