http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java new file mode 100644 index 0000000..32d3f54 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -0,0 +1,5144 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.managers.security.*; +import org.gridgain.grid.security.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.text.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.GridNodeAttributes.*; +import static org.apache.ignite.spi.IgnitePortProtocol.*; +import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; +import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; +import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; + +/** + * Discovery SPI implementation that uses TCP/IP for node discovery. + * <p> + * Nodes are organized in ring. So almost all network exchange (except few cases) is + * done across it. + * <p> + * At startup SPI tries to send messages to random IP taken from + * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} about self start (stops when send succeeds) + * and then this info goes to coordinator. When coordinator processes join request + * and issues node added messages and all other nodes then receive info about new node. + * <h1 class="header">Configuration</h1> + * <h2 class="header">Mandatory</h2> + * There are no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li>IP finder to share info about nodes IP addresses + * (see {@link #setIpFinder(org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}). + * See the following IP finder implementations for details on configuration: + * <ul> + * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li> + * <li>{@gglink org.gridgain.grid.spi.discovery.tcp.ipfinder.s3.GridTcpDiscoveryS3IpFinder}</li> + * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li> + * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li> + * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} - default</li> + * </ul> + * </li> + * </ul> + * <ul> + * </li> + * <li>Local address (see {@link #setLocalAddress(String)})</li> + * <li>Local port to bind to (see {@link #setLocalPort(int)})</li> + * <li>Local port range to try binding to if previous ports are in use + * (see {@link #setLocalPortRange(int)})</li> + * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li> + * <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li> + * <li>Number of times node tries to (re)establish connection to another node + * (see {@link #setReconnectCount(int)})</li> + * <li>Network timeout (see {@link #setNetworkTimeout(long)})</li> + * <li>Socket timeout (see {@link #setSocketTimeout(long)})</li> + * <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li> + * <li>Maximum message acknowledgement timeout (see {@link #setMaxAckTimeout(long)})</li> + * <li>Join timeout (see {@link #setJoinTimeout(long)})</li> + * <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li> + * <li>IP finder clean frequency (see {@link #setIpFinderCleanFrequency(long)})</li> + * <li>Statistics print frequency (see {@link #setStatisticsPrintFrequency(long)}</li> + * </ul> + * <h2 class="header">Java Example</h2> + * <pre name="code" class="java"> + * GridTcpDiscoverySpi spi = new GridTcpDiscoverySpi(); + * + * GridTcpDiscoveryVmIpFinder finder = + * new GridTcpDiscoveryVmIpFinder(); + * + * spi.setIpFinder(finder); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default discovery SPI. + * cfg.setDiscoverySpi(spi); + * + * // Start grid. + * GridGain.start(cfg); + * </pre> + * <h2 class="header">Spring Example</h2> + * GridTcpDiscoverySpi can be configured from Spring XML configuration file: + * <pre name="code" class="xml"> + * <bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" singleton="true"> + * ... + * <property name="discoverySpi"> + * <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi"> + * <property name="ipFinder"> + * <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder" /> + * </property> + * </bean> + * </property> + * ... + * </bean> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + * @see org.apache.ignite.spi.discovery.DiscoverySpi + */ +@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") +@IgniteSpiMultipleInstancesSupport(true) +@DiscoverySpiOrderSupport(true) +@DiscoverySpiHistorySupport(true) +public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean { + /** Default local port range (value is <tt>100</tt>). */ + public static final int DFLT_PORT_RANGE = 100; + + /** Default timeout for joining topology (value is <tt>0</tt>). */ + public static final long DFLT_JOIN_TIMEOUT = 0; + + /** Default reconnect attempts count (value is <tt>10</tt>). */ + public static final int DFLT_RECONNECT_CNT = 10; + + /** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */ + public static final int DFLT_MAX_MISSED_HEARTBEATS = 1; + + /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */ + public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; + + /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ + public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; + + /** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */ + public static final long DFLT_STATS_PRINT_FREQ = 0; + + /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */ + public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000; + + /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ + public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; + + /** Address resolver. */ + private IgniteAddressResolver addrRslvr; + + /** Local port which node uses. */ + private int locPort = DFLT_PORT; + + /** Local port range. */ + private int locPortRange = DFLT_PORT_RANGE; + + /** Statistics print frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) + private long statsPrintFreq = DFLT_STATS_PRINT_FREQ; + + /** Maximum message acknowledgement timeout. */ + private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; + + /** Join timeout. */ + @SuppressWarnings("RedundantFieldInitialization") + private long joinTimeout = DFLT_JOIN_TIMEOUT; + + /** Max heartbeats count node can miss without initiating status check. */ + private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; + + /** Max heartbeats count node can miss without failing client node. */ + private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + + /** IP finder clean frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; + + /** Reconnect attempts count. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private int reconCnt = DFLT_RECONNECT_CNT; + + /** Grid marshaller. */ + @IgniteMarshallerResource + private IgniteMarshaller gridMarsh; + + /** Nodes ring. */ + @GridToStringExclude + private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); + + /** Topology snapshots history. */ + private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + + /** Socket readers. */ + private final Collection<SocketReader> readers = new LinkedList<>(); + + /** TCP server for discovery SPI. */ + private TcpServer tcpSrvr; + + /** Message worker. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private RingMessageWorker msgWorker; + + /** Client message workers. */ + private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); + + /** Metrics sender. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private HeartbeatsSender hbsSnd; + + /** Status checker. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private CheckStatusSender chkStatusSnd; + + /** IP finder cleaner. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private IpFinderCleaner ipFinderCleaner; + + /** Statistics printer thread. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private StatisticsPrinter statsPrinter; + + /** Failed nodes (but still in topology). */ + private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + + /** Leaving nodes (but still in topology). */ + private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + + /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ + private boolean ipFinderHasLocAddr; + + /** Addresses that do not respond during join requests send (for resolving concurrent start). */ + private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>(); + + /** Addresses that incoming join requests send were send from (for resolving concurrent start). */ + private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>(); + + /** Response on join request from coordinator (in case of duplicate ID or auth failure). */ + private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1(); + + /** Context initialization latch. */ + @GridToStringExclude + private final CountDownLatch ctxInitLatch = new CountDownLatch(1); + + /** Node authenticator. */ + private DiscoverySpiNodeAuthenticator nodeAuth; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Map with proceeding ping requests. */ + private final ConcurrentMap<InetSocketAddress, IgniteFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = + new ConcurrentHashMap8<>(); + + /** Debug mode. */ + private boolean debugMode; + + /** Debug messages history. */ + private int debugMsgHist = 512; + + /** Received messages. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private ConcurrentLinkedDeque<String> debugLog; + + /** + * Sets address resolver. + * + * @param addrRslvr Address resolver. + */ + @IgniteSpiConfiguration(optional = true) + @IgniteAddressResolverResource + public void setAddressResolver(IgniteAddressResolver addrRslvr) { + // Injection should not override value already set by Spring or user. + if (this.addrRslvr == null) + this.addrRslvr = addrRslvr; + } + + /** + * Gets address resolver. + * + * @return Address resolver. + */ + public IgniteAddressResolver getAddressResolver() { + return addrRslvr; + } + + /** {@inheritDoc} */ + @Override public int getReconnectCount() { + return reconCnt; + } + + /** + * Number of times node tries to (re)establish connection to another node. + * <p> + * Note that SPI implementation will increase {@link #ackTimeout} by factor 2 + * on every retry. + * <p> + * If not specified, default is {@link #DFLT_RECONNECT_CNT}. + * + * @param reconCnt Number of retries during message sending. + * @see #setAckTimeout(long) + */ + @IgniteSpiConfiguration(optional = true) + public void setReconnectCount(int reconCnt) { + this.reconCnt = reconCnt; + } + + /** {@inheritDoc} */ + @Override public long getMaxAckTimeout() { + return maxAckTimeout; + } + + /** + * Sets maximum timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. Every time SPI retries messing sending, ack + * timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout} + * is reached, then the process of message sending is considered as failed. + * <p> + * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}. + * + * @param maxAckTimeout Maximum acknowledgement timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setMaxAckTimeout(long maxAckTimeout) { + this.maxAckTimeout = maxAckTimeout; + } + + /** {@inheritDoc} */ + @Override public long getJoinTimeout() { + return joinTimeout; + } + + /** + * Sets join timeout. + * <p> + * If non-shared IP finder is used and node fails to connect to + * any address from IP finder, node keeps trying to join within this + * timeout. If all addresses are still unresponsive, exception is thrown + * and node startup fails. + * <p> + * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. + * + * @param joinTimeout Join timeout ({@code 0} means wait forever). + * + * @see org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared() + */ + @IgniteSpiConfiguration(optional = true) + public void setJoinTimeout(long joinTimeout) { + this.joinTimeout = joinTimeout; + } + + /** {@inheritDoc} */ + @Override public int getLocalPort() { + TcpDiscoveryNode locNode0 = locNode; + + return locNode0 != null ? locNode0.discoveryPort() : 0; + } + + /** + * Sets local port to listen to. + * <p> + * If not specified, default is {@link #DFLT_PORT}. + * + * @param locPort Local port to bind. + */ + @IgniteSpiConfiguration(optional = true) + public void setLocalPort(int locPort) { + this.locPort = locPort; + } + + /** {@inheritDoc} */ + @Override public int getLocalPortRange() { + return locPortRange; + } + + /** + * Range for local ports. Local node will try to bind on first available port + * starting from {@link #getLocalPort()} up until + * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>. + * <p> + * If not specified, default is {@link #DFLT_PORT_RANGE}. + * + * @param locPortRange Local port range to bind. + */ + @IgniteSpiConfiguration(optional = true) + public void setLocalPortRange(int locPortRange) { + this.locPortRange = locPortRange; + } + + /** {@inheritDoc} */ + @Override public int getMaxMissedHeartbeats() { + return maxMissedHbs; + } + + /** + * Sets max heartbeats count node can miss without initiating status check. + * <p> + * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}. + * + * @param maxMissedHbs Max missed heartbeats. + */ + @IgniteSpiConfiguration(optional = true) + public void setMaxMissedHeartbeats(int maxMissedHbs) { + this.maxMissedHbs = maxMissedHbs; + } + + /** {@inheritDoc} */ + @Override public int getMaxMissedClientHeartbeats() { + return maxMissedClientHbs; + } + + /** + * Sets max heartbeats count node can miss without failing client node. + * <p> + * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}. + * + * @param maxMissedClientHbs Max missed client heartbeats. + */ + @IgniteSpiConfiguration(optional = true) + public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) { + this.maxMissedClientHbs = maxMissedClientHbs; + } + + /** {@inheritDoc} */ + @Override public long getStatisticsPrintFrequency() { + return statsPrintFreq; + } + + /** + * Sets statistics print frequency. + * <p> + * If not set default value is {@link #DFLT_STATS_PRINT_FREQ}. + * 0 indicates that no print is required. If value is greater than 0 and log is + * not quiet then statistics are printed out with INFO level. + * <p> + * This may be very helpful for tracing topology problems. + * + * @param statsPrintFreq Statistics print frequency in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public void setStatisticsPrintFrequency(long statsPrintFreq) { + this.statsPrintFreq = statsPrintFreq; + } + + /** {@inheritDoc} */ + @Override public long getIpFinderCleanFrequency() { + return ipFinderCleanFreq; + } + + /** + * Sets IP finder clean frequency in milliseconds. + * <p> + * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ} + * + * @param ipFinderCleanFreq IP finder clean frequency. + */ + @IgniteSpiConfiguration(optional = true) + public void setIpFinderCleanFrequency(long ipFinderCleanFreq) { + this.ipFinderCleanFreq = ipFinderCleanFreq; + } + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMode {code True} to start SPI in debug mode. + */ + public void setDebugMode(boolean debugMode) { + this.debugMode = debugMode; + } + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMsgHist Message history log size. + */ + public void setDebugMessageHistory(int debugMsgHist) { + this.debugMsgHist = debugMsgHist; + } + + /** {@inheritDoc} */ + @Override public String getSpiState() { + synchronized (mux) { + return spiState.name(); + } + } + + /** {@inheritDoc} */ + @Override public long getSocketTimeout() { + return sockTimeout; + } + + /** {@inheritDoc} */ + @Override public long getAckTimeout() { + return ackTimeout; + } + + /** {@inheritDoc} */ + @Override public long getNetworkTimeout() { + return netTimeout; + } + + /** {@inheritDoc} */ + @Override public int getThreadPriority() { + return threadPri; + } + + /** {@inheritDoc} */ + @Override public long getHeartbeatFrequency() { + return hbFreq; + } + + /** {@inheritDoc} */ + @Override public String getIpFinderFormatted() { + return ipFinder.toString(); + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return msgWorker.queueSize(); + } + + /** {@inheritDoc} */ + @Override public long getNodesJoined() { + return stats.joinedNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getNodesLeft() { + return stats.leftNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getNodesFailed() { + return stats.failedNodesCount(); + } + + /** {@inheritDoc} */ + @Override public long getPendingMessagesRegistered() { + return stats.pendingMessagesRegistered(); + } + + /** {@inheritDoc} */ + @Override public long getPendingMessagesDiscarded() { + return stats.pendingMessagesDiscarded(); + } + + /** {@inheritDoc} */ + @Override public long getAvgMessageProcessingTime() { + return stats.avgMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public long getMaxMessageProcessingTime() { + return stats.maxMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public int getTotalReceivedMessages() { + return stats.totalReceivedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getReceivedMessages() { + return stats.receivedMessages(); + } + + /** {@inheritDoc} */ + @Override public int getTotalProcessedMessages() { + return stats.totalProcessedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getProcessedMessages() { + return stats.processedMessages(); + } + + /** {@inheritDoc} */ + @Override public long getCoordinatorSinceTimestamp() { + return stats.coordinatorSinceTimestamp(); + } + + /** {@inheritDoc} */ + @Nullable @Override public UUID getCoordinator() { + TcpDiscoveryNode crd = resolveCoordinator(); + + return crd != null ? crd.id() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + assert nodeId != null; + + UUID locNodeId0 = locNodeId; + + if (locNodeId0 != null && locNodeId0.equals(nodeId)) + // Return local node directly. + return locNode; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node != null && !node.visible()) + return null; + + return node; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleRemoteNodes()); + } + + /** {@inheritDoc} */ + @Override public Collection<Object> injectables() { + Collection<Object> res = new LinkedList<>(); + + if (ipFinder != null) + res.add(ipFinder); + + return res; + } + + /** {@inheritDoc} */ + @Override public void spiStart(String gridName) throws IgniteSpiException { + spiStart0(false); + } + + /** + * Starts or restarts SPI after stop (to reconnect). + * + * @param restart {@code True} if SPI is restarted after stop. + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private void spiStart0(boolean restart) throws IgniteSpiException { + if (!restart) + // It is initial start. + onSpiStart(); + + synchronized (mux) { + spiState = DISCONNECTED; + } + + if (debugMode) { + if (!log.isInfoEnabled()) + throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + + "in debug mode."); + + debugLog = new ConcurrentLinkedDeque<>(); + + U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); + } + + // Clear addresses collections. + fromAddrs.clear(); + noResAddrs.clear(); + + sockTimeoutWorker = new SocketTimeoutWorker(); + sockTimeoutWorker.start(); + + msgWorker = new RingMessageWorker(); + msgWorker.start(); + + tcpSrvr = new TcpServer(); + + // Init local node. + IgniteBiTuple<Collection<String>, Collection<String>> addrs; + + try { + addrs = U.resolveLocalAddresses(locHost); + } + catch (IOException | GridException e) { + throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); + } + + locNode = new TcpDiscoveryNode( + locNodeId, + addrs.get1(), + addrs.get2(), + tcpSrvr.port, + metricsProvider, + locNodeVer); + + try { + Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : + U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), + locNode.discoveryPort()); + + if (extAddrs != null) + locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); + } + catch (GridException e) { + throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e); + } + + locNode.setAttributes(locNodeAttrs); + + locNode.local(true); + + locNodeAddrs = getNodeAddresses(locNode); + + if (log.isDebugEnabled()) + log.debug("Local node initialized: " + locNode); + + // Start TCP server thread after local node is initialized. + tcpSrvr.start(); + + ring.localNode(locNode); + + if (ipFinder.isShared()) + registerLocalNodeAddress(); + else { + if (F.isEmpty(ipFinder.getRegisteredAddresses())) + throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + + "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + + "(specify list of IP addresses in configuration)."); + + ipFinderHasLocAddr = ipFinderHasLocalAddress(); + } + + if (statsPrintFreq > 0 && log.isInfoEnabled()) { + statsPrinter = new StatisticsPrinter(); + statsPrinter.start(); + } + + stats.onJoinStarted(); + + joinTopology(); + + stats.onJoinFinished(); + + hbsSnd = new HeartbeatsSender(); + hbsSnd.start(); + + chkStatusSnd = new CheckStatusSender(); + chkStatusSnd.start(); + + if (ipFinder.isShared()) { + ipFinderCleaner = new IpFinderCleaner(); + ipFinderCleaner.start(); + } + + if (log.isDebugEnabled() && !restart) + log.debug(startInfo()); + + if (restart) + getSpiContext().registerPort(tcpSrvr.port, TCP); + } + + /** + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + @SuppressWarnings("BusyWait") + private void registerLocalNodeAddress() throws IgniteSpiException { + // Make sure address registration succeeded. + while (true) { + try { + ipFinder.initializeLocalAddresses(locNode.socketAddresses()); + + // Success. + break; + } + catch (IllegalStateException e) { + throw new IgniteSpiException("Failed to register local node address with IP finder: " + + locNode.socketAddresses(), e); + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to register local node address in IP finder on start " + + "(retrying every 2000 ms)."); + } + + try { + U.sleep(2000); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + } + + /** + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private void onSpiStart() throws IgniteSpiException { + startStopwatch(); + + assertParameter(ipFinder != null, "ipFinder != null"); + assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); + assertParameter(locPort > 1023, "localPort > 1023"); + assertParameter(locPortRange >= 0, "localPortRange >= 0"); + assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); + assertParameter(netTimeout > 0, "networkTimeout > 0"); + assertParameter(sockTimeout > 0, "sockTimeout > 0"); + assertParameter(ackTimeout > 0, "ackTimeout > 0"); + assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); + assertParameter(reconCnt > 0, "reconnectCnt > 0"); + assertParameter(hbFreq > 0, "heartbeatFreq > 0"); + assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); + assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); + assertParameter(threadPri > 0, "threadPri > 0"); + assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0"); + + try { + locHost = U.resolveLocalHost(locAddr); + } + catch (IOException e) { + throw new IgniteSpiException("Unknown local address: " + locAddr, e); + } + + if (log.isDebugEnabled()) { + log.debug(configInfo("localHost", locHost.getHostAddress())); + log.debug(configInfo("localPort", locPort)); + log.debug(configInfo("localPortRange", locPortRange)); + log.debug(configInfo("threadPri", threadPri)); + log.debug(configInfo("networkTimeout", netTimeout)); + log.debug(configInfo("sockTimeout", sockTimeout)); + log.debug(configInfo("ackTimeout", ackTimeout)); + log.debug(configInfo("maxAckTimeout", maxAckTimeout)); + log.debug(configInfo("reconnectCount", reconCnt)); + log.debug(configInfo("ipFinder", ipFinder)); + log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); + log.debug(configInfo("heartbeatFreq", hbFreq)); + log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs)); + log.debug(configInfo("statsPrintFreq", statsPrintFreq)); + } + + // Warn on odd network timeout. + if (netTimeout < 3000) + U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); + + // Warn on odd heartbeat frequency. + if (hbFreq < 2000) + U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq); + + registerMBean(gridName, this, TcpDiscoverySpiMBean.class); + + if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { + TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); + + if (mcastIpFinder.getLocalAddress() == null) + mcastIpFinder.setLocalAddress(locAddr); + } + } + + /** {@inheritDoc} */ + @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onContextInitialized0(spiCtx); + + ctxInitLatch.countDown(); + + spiCtx.registerPort(tcpSrvr.port, TCP); + } + + /** {@inheritDoc} */ + @Override public IgniteSpiContext getSpiContext() { + if (ctxInitLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Waiting for context initialization."); + + try { + U.await(ctxInitLatch); + + if (log.isDebugEnabled()) + log.debug("Context has been initialized."); + } + catch (GridInterruptedException e) { + U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e); + } + } + + return super.getSpiContext(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + spiStop0(false); + } + + /** + * Stops SPI finally or stops SPI for restart. + * + * @param disconnect {@code True} if SPI is being disconnected. + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + private void spiStop0(boolean disconnect) throws IgniteSpiException { + if (ctxInitLatch.getCount() > 0) + // Safety. + ctxInitLatch.countDown(); + + if (log.isDebugEnabled()) { + if (disconnect) + log.debug("Disconnecting SPI."); + else + log.debug("Preparing to start local node stop procedure."); + } + + if (disconnect) { + synchronized (mux) { + spiState = DISCONNECTING; + } + } + + if (msgWorker != null && msgWorker.isAlive() && !disconnect) { + // Send node left message only if it is final stop. + msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNodeId)); + + synchronized (mux) { + long threshold = U.currentTimeMillis() + netTimeout; + + long timeout = netTimeout; + + while (spiState != LEFT && timeout > 0) { + try { + mux.wait(timeout); + + timeout = threshold - U.currentTimeMillis(); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + break; + } + } + + if (spiState == LEFT) { + if (log.isDebugEnabled()) + log.debug("Verification for local node leave has been received from coordinator" + + " (continuing stop procedure)."); + } + else if (log.isInfoEnabled()) { + log.info("No verification for local node leave has been received from coordinator" + + " (will stop node anyway)."); + } + } + } + + U.interrupt(tcpSrvr); + U.join(tcpSrvr, log); + + Collection<SocketReader> tmp; + + synchronized (mux) { + tmp = U.arrayList(readers); + } + + U.interrupt(tmp); + U.joinThreads(tmp, log); + + U.interrupt(hbsSnd); + U.join(hbsSnd, log); + + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + + U.interrupt(ipFinderCleaner); + U.join(ipFinderCleaner, log); + + U.interrupt(msgWorker); + U.join(msgWorker, log); + + U.interrupt(sockTimeoutWorker); + U.join(sockTimeoutWorker, log); + + U.interrupt(statsPrinter); + U.join(statsPrinter, log); + + if (ipFinder != null) + ipFinder.close(); + + Collection<TcpDiscoveryNode> rmts = null; + + if (!disconnect) { + // This is final stop. + unregisterMBean(); + + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + else { + getSpiContext().deregisterPorts(); + + rmts = ring.visibleRemoteNodes(); + } + + long topVer = ring.topologyVersion(); + + ring.clear(); + + if (rmts != null && !rmts.isEmpty()) { + // This is restart/disconnection and remote nodes are not empty. + // We need to fire FAIL event for each. + DiscoverySpiListener lsnr = this.lsnr; + + if (lsnr != null) { + Collection<ClusterNode> processed = new LinkedList<>(); + + for (TcpDiscoveryNode n : rmts) { + assert n.visible(); + + processed.add(n); + + Collection<ClusterNode> top = F.viewReadOnly(rmts, F.<ClusterNode>identity(), F.notIn(processed)); + + topVer++; + + Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); + + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist); + } + } + } + + printStatistics(); + + stats.clear(); + + synchronized (mux) { + // Clear stored data. + leavingNodes.clear(); + failedNodes.clear(); + + spiState = DISCONNECTED; + } + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + super.onContextDestroyed0(); + + if (ctxInitLatch.getCount() > 0) + // Safety. + ctxInitLatch.countDown(); + + getSpiContext().deregisterPorts(); + } + + /** + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + * @return {@code true} if IP finder contains local address. + */ + private boolean ipFinderHasLocalAddress() throws IgniteSpiException { + for (InetSocketAddress locAddr : locNodeAddrs) { + for (InetSocketAddress addr : registeredAddresses()) + try { + int port = addr.getPort(); + + InetSocketAddress resolved = addr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) : + new InetSocketAddress(addr.getAddress(), port); + + if (resolved.equals(locAddr)) + return true; + } + catch (UnknownHostException ignored) { + // No-op. + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + assert nodeId != null; + + if (nodeId == locNodeId) + return true; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node == null || !node.visible()) + return false; + + boolean res = pingNode(node); + + if (!res && !node.isClient()) { + LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); + } + + return res; + } + + /** + * Pings the remote node to see if it's alive. + * + * @param node Node. + * @return {@code True} if ping succeeds. + */ + private boolean pingNode(TcpDiscoveryNode node) { + assert node != null; + + if (node.id().equals(locNodeId)) + return true; + + UUID clientNodeId = null; + + if (node.isClient()) { + clientNodeId = node.id(); + + node = ring.node(node.clientRouterNodeId()); + + if (node == null || !node.visible()) + return false; + } + + for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) { + try { + // ID returned by the node should be the same as ID of the parameter for ping to succeed. + IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); + + return node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); + } + catch (GridException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + + // continue; + } + } + + return false; + } + + /** + * Pings the remote node by its address to see if it's alive. + * + * @param addr Address of the node. + * @return ID of the remote node if node alive. + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) + throws GridException { + assert addr != null; + + if (F.contains(locNodeAddrs, addr)) + return F.t(locNodeId, false); + + GridFutureAdapterEx<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapterEx<>(); + + IgniteFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); + + if (oldFut != null) + return oldFut.get(); + else { + Collection<Throwable> errs = null; + + try { + Socket sock = null; + + for (int i = 0; i < reconCnt; i++) { + try { + if (addr.isUnresolved()) + addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); + + long tstamp = U.currentTimeMillis(); + + sock = openSocket(addr); + + writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + + TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout); + + if (locNodeId.equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Ping response from local node: " + res); + + break; + } + + stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); + + fut.onDone(t); + + return t; + } + catch (IOException | GridException e) { + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + } + finally { + U.closeQuiet(sock); + } + } + } + catch (Throwable t) { + fut.onDone(t); + + throw U.cast(t); + } + finally { + if (!fut.isDone()) + fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs)); + + boolean b = pingMap.remove(addr, fut); + + assert b; + } + + return fut.get(); + } + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + spiStop0(true); + } + + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) { + this.nodeAuth = nodeAuth; + } + + /** + * Tries to join this node to topology. + * + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + private void joinTopology() throws IgniteSpiException { + synchronized (mux) { + assert spiState == CONNECTING || spiState == DISCONNECTED; + + spiState = CONNECTING; + } + + GridSecurityCredentials locCred = (GridSecurityCredentials)locNode.getAttributes() + .get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + // Marshal credentials for backward compatibility and security. + marshalCredentials(locNode); + + while (true) { + if (!sendJoinRequestMessage()) { + if (log.isDebugEnabled()) + log.debug("Join request message has not been sent (local node is the first in the topology)."); + + // Authenticate local node. + try { + GridSecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); + + if (subj == null) + throw new IgniteSpiException("Authentication failed for local node: " + locNode.id()); + + Map<String, Object> attrs = new HashMap<>(locNode.attributes()); + + attrs.put(GridNodeAttributes.ATTR_SECURITY_SUBJECT, gridMarsh.marshal(subj)); + attrs.remove(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + locNode.setAttributes(attrs); + } + catch (GridException e) { + throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); + } + + locNode.order(1); + locNode.internalOrder(1); + + gridStartTime = U.currentTimeMillis(); + + locNode.visible(true); + + ring.clear(); + + ring.topologyVersion(1); + + synchronized (mux) { + topHist.clear(); + + spiState = CONNECTED; + + mux.notifyAll(); + } + + notifyDiscovery(EVT_NODE_JOINED, 1, locNode); + + break; + } + + if (log.isDebugEnabled()) + log.debug("Join request message has been sent (waiting for coordinator response)."); + + synchronized (mux) { + long threshold = U.currentTimeMillis() + netTimeout; + + long timeout = netTimeout; + + while (spiState == CONNECTING && timeout > 0) { + try { + mux.wait(timeout); + + timeout = threshold - U.currentTimeMillis(); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException("Thread has been interrupted."); + } + } + + if (spiState == CONNECTED) + break; + else if (spiState == DUPLICATE_ID) + throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); + else if (spiState == AUTH_FAILED) + throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); + else if (spiState == CHECK_FAILED) + throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); + else if (spiState == LOOPBACK_PROBLEM) { + TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); + + boolean locHostLoopback = locHost.isLoopbackAddress(); + + String firstNode = locHostLoopback ? "local" : "remote"; + + String secondNode = locHostLoopback ? "remote" : "local"; + + throw new IgniteSpiException("Failed to add node to topology because " + firstNode + + " node is configured to use loopback address, but " + secondNode + " node is not " + + "(consider changing 'localAddress' configuration parameter) " + + "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + + U.addressesAsString(msg.addresses(), msg.hostNames()) + ']'); + } + else + LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + + "Check remote nodes logs for possible error messages. " + + "Note that large topology may require significant time to start. " + + "Increase 'GridTcpDiscoverySpi.networkTimeout' configuration property " + + "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']'); + } + } + + assert locNode.order() != 0; + assert locNode.internalOrder() != 0; + + if (log.isDebugEnabled()) + log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); + } + + /** + * @param msg Error message. + * @return Remote grid version parsed from error message. + * @deprecated This method was created for preserving backward compatibility. During major version update + * parsing of error message should be replaced with new {@link org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage} + * which contains all necessary information. + */ + @Deprecated + @Nullable private String parseRemoteVersion(String msg) { + msg = msg.replaceAll("\\s", ""); + + final String verPrefix = "rmtBuildVer="; + + int startIdx = msg.indexOf(verPrefix); + int endIdx = msg.indexOf(',', startIdx); + + if (endIdx < 0) + endIdx = msg.indexOf(']', startIdx); + + if (startIdx < 0 || endIdx < 0) + return null; + + return msg.substring(startIdx + verPrefix.length() - 1, endIdx); + } + + /** + * Tries to send join request message to a random node presenting in topology. + * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is + * sent to first node connection succeeded to. + * + * @return {@code true} if send succeeded. + * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. + */ + @SuppressWarnings({"BusyWait"}) + private boolean sendJoinRequestMessage() throws IgniteSpiException { + TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, + exchange.collect(locNodeId)); + + // Time when it has been detected, that addresses from IP finder do not respond. + long noResStart = 0; + + while (true) { + Collection<InetSocketAddress> addrs = resolvedAddresses(); + + if (F.isEmpty(addrs)) + return false; + + boolean retry = false; + GridException errs = null; + + for (InetSocketAddress addr : addrs) { + try { + Integer res = sendMessageDirectly(joinReq, addr); + + assert res != null; + + noResAddrs.remove(addr); + + // Address is responsive, reset period start. + noResStart = 0; + + switch (res) { + case RES_WAIT: + // Concurrent startup, try sending join request again or wait if no success. + retry = true; + + break; + case RES_OK: + if (log.isDebugEnabled()) + log.debug("Join request message has been sent to address [addr=" + addr + + ", req=" + joinReq + ']'); + + // Join request sending succeeded, wait for response from topology. + return true; + + default: + // Concurrent startup, try next node. + if (res == RES_CONTINUE_JOIN) { + if (!fromAddrs.contains(addr)) + retry = true; + } + else { + if (log.isDebugEnabled()) + log.debug("Unexpected response to join request: " + res); + + retry = true; + } + + break; + } + } + catch (IgniteSpiException e) { + if (errs == null) + errs = new GridException("Multiple connection attempts failed."); + + errs.addSuppressed(e); + + if (log.isDebugEnabled()) { + IOException ioe = X.cause(e, IOException.class); + + log.debug("Failed to send join request message [addr=" + addr + + ", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']'); + } + + noResAddrs.add(addr); + } + } + + if (retry) { + if (log.isDebugEnabled()) + log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); + + try { + U.sleep(2000); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + else if (!ipFinder.isShared() && !ipFinderHasLocAddr) { + if (errs != null && X.hasCause(errs, ConnectException.class)) + LT.warn(log, null, "Failed to connect to any address from IP finder " + + "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + + addrs); + + if (joinTimeout > 0) { + if (noResStart == 0) + noResStart = U.currentTimeMillis(); + else if (U.currentTimeMillis() - noResStart > joinTimeout) + throw new IgniteSpiException( + "Failed to connect to any address from IP finder within join timeout " + + "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + + "on all host machines, or consider increasing 'joinTimeout' configuration property): " + + addrs, errs); + } + + try { + U.sleep(2000); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + else + break; + } + + return false; + } + + /** + * Establishes connection to an address, sends message and returns the response (if any). + * + * @param msg Message to send. + * @param addr Address to send message to. + * @return Response read from the recipient or {@code null} if no response is supposed. + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr) + throws IgniteSpiException { + assert msg != null; + assert addr != null; + + Collection<Throwable> errs = null; + + Socket sock = null; + + long ackTimeout0 = ackTimeout; + + int connectAttempts = 1; + + boolean joinReqSent = false; + + for (int i = 0; i < reconCnt; i++) { + // Need to set to false on each new iteration, + // since remote node may leave in the middle of the first iteration. + joinReqSent = false; + + boolean openSock = false; + + try { + long tstamp = U.currentTimeMillis(); + + sock = openSocket(addr); + + openSock = true; + + // Handshake. + writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); + + TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0); + + if (locNodeId.equals(res.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Handshake response from local node: " + res); + + break; + } + + stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + // Send message. + tstamp = U.currentTimeMillis(); + + writeToSocket(sock, msg); + + stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + + if (debugMode) + debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + ", rmtNodeId=" + res.creatorNodeId() + ']'); + + if (log.isDebugEnabled()) + log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + + ", rmtNodeId=" + res.creatorNodeId() + ']'); + + // Connection has been established, but + // join request may not be unmarshalled on remote host. + // E.g. due to class not found issue. + joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; + + return readReceipt(sock, ackTimeout0); + } + catch (ClassCastException e) { + // This issue is rarely reproducible on AmazonEC2, but never + // on dedicated machines. + if (log.isDebugEnabled()) + U.error(log, "Class cast exception on direct send: " + addr, e); + + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + } + catch (IOException | GridException e) { + if (log.isDebugEnabled()) + log.error("Exception on direct send: " + e.getMessage(), e); + + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + + if (!openSock) { + // Reconnect for the second time, if connection is not established. + if (connectAttempts < 2) { + connectAttempts++; + + continue; + } + + break; // Don't retry if we can not establish connection. + } + + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } + } + finally { + U.closeQuiet(sock); + } + } + + if (joinReqSent) { + if (log.isDebugEnabled()) + log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); + + // Topology will not include this node, + // however, warning on timed out join will be output. + return RES_OK; + } + + throw new IgniteSpiException( + "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']', + U.exceptionWithSuppressed("Failed to send message to address " + + "[addr=" + addr + ", msg=" + msg + ']', errs)); + } + + /** + * Marshalls credentials with discovery SPI marshaller (will replace attribute value). + * + * @param node Node to marshall credentials for. + * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed. + */ + private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { + try { + // Use security-unsafe getter. + Map<String, Object> attrs = new HashMap<>(node.getAttributes()); + + attrs.put(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS, + marsh.marshal(attrs.get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS))); + + node.setAttributes(attrs); + } + catch (GridException e) { + throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); + } + } + + /** + * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value). + * + * @param node Node to unmarshall credentials for. + * @return Security credentials. + * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails. + */ + private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { + try { + byte[] credBytes = (byte[])node.getAttributes().get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + if (credBytes == null) + return null; + + return marsh.unmarshal(credBytes, null); + } + catch (GridException e) { + throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); + } + } + + /** + * @param ackTimeout Acknowledgement timeout. + * @return {@code True} if acknowledgement timeout is less or equal to + * maximum acknowledgement timeout, {@code false} otherwise. + */ + private boolean checkAckTimeout(long ackTimeout) { + if (ackTimeout > maxAckTimeout) { + LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + + "(consider increasing 'maxAckTimeout' configuration property) " + + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']'); + + return false; + } + + return true; + } + + /** + * Notify external listener on discovery event. + * + * @param type Discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent} for more details. + * @param topVer Topology version. + * @param node Remote node this event is connected with. + */ + private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { + assert type > 0; + assert node != null; + + DiscoverySpiListener lsnr = this.lsnr; + + TcpDiscoverySpiState spiState = spiStateCopy(); + + if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { + if (log.isDebugEnabled()) + log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + + Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); + + Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); + + lsnr.onDiscovery(type, topVer, node, top, hist); + } + else if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + } + + /** + * Update topology history with new topology snapshots. + * + * @param topVer Topology version. + * @param top Topology snapshot. + * @return Copy of updated topology history. + */ + @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) { + synchronized (mux) { + if (topHist.containsKey(topVer)) + return null; + + topHist.put(topVer, top); + + while (topHist.size() > topHistSize) + topHist.remove(topHist.firstKey()); + + if (log.isDebugEnabled()) + log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); + + return new TreeMap<>(topHist); + } + } + + /** + * @param node Node. + * @return {@link LinkedHashSet} of internal and external addresses of provided node. + * Internal addresses placed before external addresses. + */ + @SuppressWarnings("TypeMayBeWeakened") + private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { + LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); + + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); + + if (extAddrs != null) + res.addAll(extAddrs); + + return res; + } + + /** + * @param node Node. + * @param sameHost Same host flag. + * @return {@link LinkedHashSet} of internal and external addresses of provided node. + * Internal addresses placed before external addresses. + * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}. + */ + @SuppressWarnings("TypeMayBeWeakened") + private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) { + List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses()); + + Collections.sort(addrs, U.inetAddressesComparator(sameHost)); + + LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs); + + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); + + if (extAddrs != null) + res.addAll(extAddrs); + + return res; + } + + /** + * Checks whether local node is coordinator. Nodes that are leaving or failed + * (but are still in topology) are removed from search. + * + * @return {@code true} if local node is coordinator. + */ + private boolean isLocalNodeCoordinator() { + synchronized (mux) { + boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator()); + + if (crd) + stats.onBecomingCoordinator(); + + return crd; + } + } + + /** + * @return Spi state copy. + */ + private TcpDiscoverySpiState spiStateCopy() { + TcpDiscoverySpiState state; + + synchronized (mux) { + state = spiState; + } + + return state; + } + + /** + * Resolves coordinator. Nodes that are leaving or failed (but are still in + * topology) are removed from search. + * + * @return Coordinator node or {@code null} if there are no coordinator + * (i.e. local node is the last one and is currently stopping). + */ + @Nullable private TcpDiscoveryNode resolveCoordinator() { + return resolveCoordinator(null); + } + + /** + * Resolves coordinator. Nodes that are leaving or failed (but are still in + * topology) are removed from search as well as provided filter. + * + * @param filter Nodes to exclude when resolving coordinator (optional). + * @return Coordinator node or {@code null} if there are no coordinator + * (i.e. local node is the last one and is currently stopping). + */ + @Nullable private TcpDiscoveryNode resolveCoordinator( + @Nullable Collection<TcpDiscoveryNode> filter) { + synchronized (mux) { + Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); + + if (!F.isEmpty(filter)) + excluded = F.concat(false, excluded, filter); + + return ring.coordinator(excluded); + } + } + + /** + * Prints SPI statistics. + */ + private void printStatistics() { + if (log.isInfoEnabled() && statsPrintFreq > 0) { + int failedNodesSize; + int leavingNodesSize; + + synchronized (mux) { + failedNodesSize = failedNodes.size(); + leavingNodesSize = leavingNodes.size(); + } + + Runtime runtime = Runtime.getRuntime(); + + TcpDiscoveryNode coord = resolveCoordinator(); + + log.info("Discovery SPI statistics [statistics=" + stats + ", spiState=" + spiStateCopy() + + ", coord=" + coord + + ", topSize=" + ring.allNodes().size() + + ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + + ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + + ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + + ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + + "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); + } + } + + /** + * @param msg Message to prepare. + * @param destNodeId Destination node ID. + * @param msgs Messages to include. + * @param discardMsgId Discarded message ID. + */ + private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, + @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { + assert destNodeId != null; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + TcpDiscoveryNode node = nodeAddedMsg.node(); + + if (node.id().equals(destNodeId)) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + assert n0.internalOrder() != 0 : n0; + + // Skip next node and nodes added after next + // in case this message is resent due to failures/leaves. + // There will be separate messages for nodes with greater + // internal order. + if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) + topToSend.add(n0); + } + + nodeAddedMsg.topology(topToSend); + nodeAddedMsg.messages(msgs, discardMsgId); + + Map<Long, Collection<ClusterNode>> hist; + + synchronized (mux) { + hist = new TreeMap<>(topHist); + } + + nodeAddedMsg.topologyHistory(hist); + } + } + } + + /** + * @param msg Message to clear. + */ + private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + // Nullify topology before registration. + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + nodeAddedMsg.topology(null); + nodeAddedMsg.topologyHistory(null); + nodeAddedMsg.messages(null, null); + } + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * Simulates this node failure by stopping service threads. So, node will become + * unresponsive. + * <p> + * This method is intended for test purposes only. + */ + void simulateNodeFailure() { + U.warn(log, "Simulating node failure: " + locNodeId); + + U.interrupt(tcpSrvr); + U.join(tcpSrvr, log); + + U.interrupt(hbsSnd); + U.join(hbsSnd, log); + + U.interrupt(chkStatusSnd); + U.join(chkStatusSnd, log); + + U.interrupt(ipFinderCleaner); + U.join(ipFinderCleaner, log); + + Collection<SocketReader> tmp; + + synchronized (mux) { + tmp = U.arrayList(readers); + } + + U.interrupt(tmp); + U.joinThreads(tmp, log); + + U.interrupt(msgWorker); + U.join(msgWorker, log); + + U.interrupt(statsPrinter); + U.join(statsPrinter, log); + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * Simulates situation when next node is still alive but is bypassed + * since it has been excluded from the ring, possibly, due to short time + * network problems. + * <p> + * This method is intended for test purposes only. + */ + void forceNextNodeFailure() { + U.warn(log, "Next node will be forcibly failed (if any)."); + + TcpDiscoveryNode next; + + synchronized (mux) { + next = ring.nextNode(failedNodes); + } + + if (next != null) + msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, next.id(), next.internalOrder())); + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * This method is intended for test purposes only. + * + * @param msg Message. + */ + void onBeforeMessageSentAcrossRing(Serializable msg) { + // No-op. + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + * <p> + * This method is intended for test purposes only. + * + * @return Nodes ring. + */ + TcpDiscoveryNodesRing ring() { + return ring; + } + + /** {@inheritDoc} */ + @Override public void dumpDebugInfo() { + dumpDebugInfo(log); + } + + /** + * @param log Logger. + */ + public void dumpDebugInfo(IgniteLogger log) { + if (!debugMode) { + U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " + + "in debug mode, consider setting 'debugMode' configuration property to 'true')."); + + return; + } + + assert log.isInfoEnabled(); + + synchronized (mux) { + StringBuilder b = new StringBuilder(U.nl()); + + b.append(">>>").append(U.nl()); + b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); + b.append(">>>").append(U.nl()); + + b.append("Local node ID: ").append(locNodeId).append(U.nl()).append(U.nl()); + b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); + b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl()); + + b.append("Internal threads: ").append(U.nl()); + + b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); + b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); + b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); + b.append(" Socket timeout worker: ").append(threadStatus(sockTimeoutWorker)).append(U.nl()); + b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); + b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); + + b.append(U.nl()); + + b.append("Socket readers: ").append(U.nl()); + + for (SocketReader rdr : readers) + b.append(" ").append(rdr).append(U.nl()); + + b.append(U.nl()); + + b.append("In-memory log messages: ").append(U.nl()); + + for (String msg : debugLog) + b.append(" ").append(msg).append(U.nl()); + + b.append(U.nl()); + + b.append("Leaving nodes: ").append(U.nl()); + + for (TcpDiscoveryNode node : leavingNodes) + b.append(" ").append(node.id()).append(U.nl()); + + b.append(U.nl()); + + b.append("Failed nodes: ").append(U.nl()); + + for (TcpDiscoveryNode node : failedNodes) + b.append(" ").append(node.id()).append(U.nl()); + + b.append(U.nl()); + + b.append("Stats: ").append(stats).append(U.nl()); + + U.quietAndInfo(log, b.toString()); + } + } + + /** + * @param msg Message. + */ + private void debugLog(String msg) { + assert debugMode; + + String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + + '[' + Thread.currentThread().getName() + "][" + locNodeId + "-" + locNode.internalOrder() + "] " + + msg; + + debugLog.add(msg0); + + int delta = debugLog.size() - debugMsgHist; + + for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) + debugLog.poll(); + } + + /** + * @param msg Message. + * @return {@code True} if recordable in debug mode. + */ + private boolean recordable(TcpDiscoveryAbstractMessage msg) { + return !(msg instanceof TcpDiscoveryHeartbeatMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage) && + !(msg instanceof TcpDiscoveryDiscardMessage); + } + + /** + * @param t Thread. + * @return Status as string. + */ + private String threadStatus(Thread t) { + if (t == null) + return "N/A"; + + return t.isAlive() ? "alive" : "dead"; + } + + /** + * Checks if two given {@link GridSecurityPermissionSet} objects contain the same permissions. + * Each permission belongs to one of three groups : cache, task or system. + * + * @param locPerms The first set of permissions. + * @param rmtPerms The second set of permissions. + * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise. + */ + private boolean permissionsEqual(GridSecurityPermissionSet locPerms, GridSecurityPermissionSet rmtPerms) { + boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll()); + + boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) && + F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) && + F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions()); + + return dfltAllowMatch && bothHaveSamePerms; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoverySpi.class, this); + } + + /** + * Thread that sends heartbeats. + */ + private class HeartbeatsSender extends IgniteSpiThread { + /** + * Constructor. + */ + private HeartbeatsSender() { + super(gridName, "tcp-disco-hb-sender", log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + while (!isLocalNodeCoordinator()) + Thread.sleep(1000); + + if (log.isDebugEnabled()) + log.debug("Heartbeats sender has been started."); + + while (!isInterrupted()) { + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); + + return; + } + + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId); + + msg.verify(locNodeId); + + msgWorker.addMessage(msg); + + Thread.sleep(hbFreq); + } + } + } + + /** + * Thread that sends status check messages to next node if local node has not + * been receiving heartbeats ({@link org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage}) + * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * + * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. + */ + private class CheckStatusSender extends IgniteSpiThread { + /** + * Constructor. + */ + private CheckStatusSender() { + super(gridName, "tcp-disco-status-check-sender", log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Status check sender has been started."); + + // Only 1 heartbeat missing is acceptable. 1 sec is added to avoid false alarm. + long checkTimeout = (long)maxMissedHbs * hbFreq + 1000; + + long lastSent = 0; + + while (!isInterrupted()) { + // 1. Determine timeout. + if (lastSent < locNode.lastUpdateTime()) + lastSent = locNode.lastUpdateTime(); + + long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); + + if (timeout > 0) + Thread.sleep(timeout); + + // 2. Check if SPI is still connected. + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping status check sender (SPI is not connected to topology)."); + + return; + } + + // 3. Was there an update? + if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { + if (log.isDebugEnabled()) + log.debug("Skipping status check send " + + "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + + ", hasRmts=" + ring.hasRemoteNodes() + ']'); + + continue; + } + + // 4. Send status check message. + lastSent = U.currentTimeMillis(); + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); + } + } + } + + /** + * Thread that cleans IP finder and keeps it in the correct state, unregistering + * addresses of the nodes that has left the topology. + * <p> + * This thread should run only on coordinator node and will clean IP finder + * if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}. + */ + private class IpFinderCleaner extends IgniteSpiThread { + /** + * Constructor. + */ + private IpFinderCleaner() { + super(gridName, "tcp-disco-ip-finder-cleaner", log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("IP finder cleaner has been started."); + + while (!isInterrupted()) { + Thread.sleep(ipFinderCleanFreq); + + if (!isLocalNodeCoordinator()) + continue; + + if (spiStateCopy() != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Stopping IP finder cleaner (SPI is not connected to topology)."); + + return; + } + + if (ipFinder.isShared()) + cleanIpFinder(); + } + } + + /** + * Cleans IP finder. + */ + private void cleanIpFinder() { + assert ipFinder.isShared(); + + try { + // Addresses that belongs to nodes in topology. + Collection<InetSocketAddress> currAddrs = F.flatCollections( + F.viewReadOnly( + ring.allNodes(), + new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() { + @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) { + return !node.isClient() ? getNodeAddresses(node) : + Collections.<InetSocketAddress>emptyList(); + } + } + ) + ); + + // Addresses registered in IP finder. + Collection<InetSocketAddress> regAddrs = registeredAddresses(); + + // Remove all addresses that belong to alive nodes, leave dead-node addresses. + Collection<InetSocketAddress> rmvAddrs = F.view( + regAddrs, + F.notContains(currAddrs), + new P1<InetSocketAddress>() { + private final Map<InetSocketAddress, Boolean> pingResMap = + new HashMap<>(); + + @Override public boolean apply(InetSocketAddress addr) { + Boolean res = pingResMap.get(addr); + + if (res == null) { + try { + res = pingNode(addr, null).get1() != null; + } + catch (GridException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [addr=" + addr + + ", err=" + e.getMessage() + ']'); + + res = false; + } + finally { + pingResMap.put(addr, res); + } + } + + return !res; + } + } + ); + + // Unregister dead-nodes addresses. + if (!rmvAddrs.isEmpty()) { + ipFinder.unregisterAddresses(rmvAddrs); + + if (log.isDebugEnabled()) + log.debug("Unregistered addresses from IP finder: " + rmvAddrs); + } + + // Addresses that were removed by mistake (e.g. on segmentation). + Collection<InetSocketAddress> missingAddrs = F.view( + currAddrs, + F.notContains(regAddrs) + ); + + // Re-register missing addresses. + if (!missingAddrs.isEmpty()) { + ipFinder.registerAddresses(missingAddrs); + + if (log.isDebugEnabled()) + log.debug("Registered missing addresses in IP finder: " + missingAddrs); + } + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to clean IP finder up."); + } + } + } + + /** + * Pending messages container. + */ + private static class PendingMessages { + /** */ + private static final int MAX = 1024; + + /** Pending messages. */ + private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + + /** Discarded message ID. */ + private IgniteUuid discardId; + + /** + * Adds pending message and shrinks queue if it exceeds limit + * (messages that were not discarded yet are never removed). + * + * @param msg Message to add. + */ + void add(TcpDiscoveryAbstractMessage msg) { + msgs.add(msg); + + while (msgs.size() > MAX) { + TcpDiscoveryAbstractMessage polled = msgs.poll(); + + assert polled != null; + + if (polled.id().equals(discardId)) + break; + } + } + + /** + * Gets messages starting from provided ID (exclusive). If such + * message is not found, {@code null} is returned (this indicates + * a failure condition when it was already removed from queue). + * + * @param lastMsgId Last message ID. + * @return Collection of messages. + */ + @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) { + assert lastMsgId != null; + + Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size()); + + boolean skip = true; + + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (skip) { + if (msg.id().equals(lastMsgId)) + skip = false; + } + else + copy.add(msg); + } + + return !skip ? copy : null; + } + + /** + * Resets pending messages. + * + * @param msgs Message. + * @param discardId Discarded message ID. + */ + void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) { + this.msgs.clear(); + + if (msgs != null) + this.msgs.addAll(msgs); + + this.discardId = discardId; + } + + /** + * Clears pending messages. + */ + void clear() { + msgs.clear(); + + discardId = null; + } + + /** + * Discards message with provided ID and all before it. + * + * @param id Discarded message ID. + */ + void discard(IgniteUuid id) { + discardId = id; + } + } + + /** + * Message worker thread for messages
<TRUNCATED>