http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java deleted file mode 100644 index 40e4418..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java +++ /dev/null @@ -1,5144 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.managers.security.*; -import org.gridgain.grid.security.*; -import org.gridgain.grid.spi.discovery.*; -import org.gridgain.grid.spi.discovery.tcp.internal.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*; -import org.gridgain.grid.spi.discovery.tcp.messages.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.text.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.gridgain.grid.kernal.GridNodeAttributes.*; -import static org.apache.ignite.spi.IgnitePortProtocol.*; -import static org.gridgain.grid.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; -import static org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; -import static org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; - -/** - * Discovery SPI implementation that uses TCP/IP for node discovery. - * <p> - * Nodes are organized in ring. So almost all network exchange (except few cases) is - * done across it. - * <p> - * At startup SPI tries to send messages to random IP taken from - * {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} about self start (stops when send succeeds) - * and then this info goes to coordinator. When coordinator processes join request - * and issues node added messages and all other nodes then receive info about new node. - * <h1 class="header">Configuration</h1> - * <h2 class="header">Mandatory</h2> - * There are no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li>IP finder to share info about nodes IP addresses - * (see {@link #setIpFinder(org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}). - * See the following IP finder implementations for details on configuration: - * <ul> - * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li> - * <li>{@gglink org.gridgain.grid.spi.discovery.tcp.ipfinder.s3.GridTcpDiscoveryS3IpFinder}</li> - * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li> - * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li> - * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} - default</li> - * </ul> - * </li> - * </ul> - * <ul> - * </li> - * <li>Local address (see {@link #setLocalAddress(String)})</li> - * <li>Local port to bind to (see {@link #setLocalPort(int)})</li> - * <li>Local port range to try binding to if previous ports are in use - * (see {@link #setLocalPortRange(int)})</li> - * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li> - * <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li> - * <li>Number of times node tries to (re)establish connection to another node - * (see {@link #setReconnectCount(int)})</li> - * <li>Network timeout (see {@link #setNetworkTimeout(long)})</li> - * <li>Socket timeout (see {@link #setSocketTimeout(long)})</li> - * <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li> - * <li>Maximum message acknowledgement timeout (see {@link #setMaxAckTimeout(long)})</li> - * <li>Join timeout (see {@link #setJoinTimeout(long)})</li> - * <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li> - * <li>IP finder clean frequency (see {@link #setIpFinderCleanFrequency(long)})</li> - * <li>Statistics print frequency (see {@link #setStatisticsPrintFrequency(long)}</li> - * </ul> - * <h2 class="header">Java Example</h2> - * <pre name="code" class="java"> - * GridTcpDiscoverySpi spi = new GridTcpDiscoverySpi(); - * - * GridTcpDiscoveryVmIpFinder finder = - * new GridTcpDiscoveryVmIpFinder(); - * - * spi.setIpFinder(finder); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default discovery SPI. - * cfg.setDiscoverySpi(spi); - * - * // Start grid. - * GridGain.start(cfg); - * </pre> - * <h2 class="header">Spring Example</h2> - * GridTcpDiscoverySpi can be configured from Spring XML configuration file: - * <pre name="code" class="xml"> - * <bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" singleton="true"> - * ... - * <property name="discoverySpi"> - * <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi"> - * <property name="ipFinder"> - * <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder" /> - * </property> - * </bean> - * </property> - * ... - * </bean> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - * @see org.gridgain.grid.spi.discovery.DiscoverySpi - */ -@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") -@IgniteSpiMultipleInstancesSupport(true) -@DiscoverySpiOrderSupport(true) -@DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean { - /** Default local port range (value is <tt>100</tt>). */ - public static final int DFLT_PORT_RANGE = 100; - - /** Default timeout for joining topology (value is <tt>0</tt>). */ - public static final long DFLT_JOIN_TIMEOUT = 0; - - /** Default reconnect attempts count (value is <tt>10</tt>). */ - public static final int DFLT_RECONNECT_CNT = 10; - - /** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */ - public static final int DFLT_MAX_MISSED_HEARTBEATS = 1; - - /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */ - public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; - - /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ - public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; - - /** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */ - public static final long DFLT_STATS_PRINT_FREQ = 0; - - /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */ - public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000; - - /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ - public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; - - /** Address resolver. */ - private IgniteAddressResolver addrRslvr; - - /** Local port which node uses. */ - private int locPort = DFLT_PORT; - - /** Local port range. */ - private int locPortRange = DFLT_PORT_RANGE; - - /** Statistics print frequency. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) - private long statsPrintFreq = DFLT_STATS_PRINT_FREQ; - - /** Maximum message acknowledgement timeout. */ - private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; - - /** Join timeout. */ - @SuppressWarnings("RedundantFieldInitialization") - private long joinTimeout = DFLT_JOIN_TIMEOUT; - - /** Max heartbeats count node can miss without initiating status check. */ - private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; - - /** Max heartbeats count node can miss without failing client node. */ - private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; - - /** IP finder clean frequency. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; - - /** Reconnect attempts count. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private int reconCnt = DFLT_RECONNECT_CNT; - - /** Grid marshaller. */ - @IgniteMarshallerResource - private IgniteMarshaller gridMarsh; - - /** Nodes ring. */ - @GridToStringExclude - private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); - - /** Topology snapshots history. */ - private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); - - /** Socket readers. */ - private final Collection<SocketReader> readers = new LinkedList<>(); - - /** TCP server for discovery SPI. */ - private TcpServer tcpSrvr; - - /** Message worker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RingMessageWorker msgWorker; - - /** Client message workers. */ - private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); - - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; - - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; - - /** IP finder cleaner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IpFinderCleaner ipFinderCleaner; - - /** Statistics printer thread. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private StatisticsPrinter statsPrinter; - - /** Failed nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); - - /** Leaving nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); - - /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ - private boolean ipFinderHasLocAddr; - - /** Addresses that do not respond during join requests send (for resolving concurrent start). */ - private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>(); - - /** Addresses that incoming join requests send were send from (for resolving concurrent start). */ - private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>(); - - /** Response on join request from coordinator (in case of duplicate ID or auth failure). */ - private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1(); - - /** Context initialization latch. */ - @GridToStringExclude - private final CountDownLatch ctxInitLatch = new CountDownLatch(1); - - /** Node authenticator. */ - private DiscoverySpiNodeAuthenticator nodeAuth; - - /** Mutex. */ - private final Object mux = new Object(); - - /** Map with proceeding ping requests. */ - private final ConcurrentMap<InetSocketAddress, IgniteFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = - new ConcurrentHashMap8<>(); - - /** Debug mode. */ - private boolean debugMode; - - /** Debug messages history. */ - private int debugMsgHist = 512; - - /** Received messages. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ConcurrentLinkedDeque<String> debugLog; - - /** - * Sets address resolver. - * - * @param addrRslvr Address resolver. - */ - @IgniteSpiConfiguration(optional = true) - @IgniteAddressResolverResource - public void setAddressResolver(IgniteAddressResolver addrRslvr) { - // Injection should not override value already set by Spring or user. - if (this.addrRslvr == null) - this.addrRslvr = addrRslvr; - } - - /** - * Gets address resolver. - * - * @return Address resolver. - */ - public IgniteAddressResolver getAddressResolver() { - return addrRslvr; - } - - /** {@inheritDoc} */ - @Override public int getReconnectCount() { - return reconCnt; - } - - /** - * Number of times node tries to (re)establish connection to another node. - * <p> - * Note that SPI implementation will increase {@link #ackTimeout} by factor 2 - * on every retry. - * <p> - * If not specified, default is {@link #DFLT_RECONNECT_CNT}. - * - * @param reconCnt Number of retries during message sending. - * @see #setAckTimeout(long) - */ - @IgniteSpiConfiguration(optional = true) - public void setReconnectCount(int reconCnt) { - this.reconCnt = reconCnt; - } - - /** {@inheritDoc} */ - @Override public long getMaxAckTimeout() { - return maxAckTimeout; - } - - /** - * Sets maximum timeout for receiving acknowledgement for sent message. - * <p> - * If acknowledgement is not received within this timeout, sending is considered as failed - * and SPI tries to repeat message sending. Every time SPI retries messing sending, ack - * timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout} - * is reached, then the process of message sending is considered as failed. - * <p> - * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}. - * - * @param maxAckTimeout Maximum acknowledgement timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setMaxAckTimeout(long maxAckTimeout) { - this.maxAckTimeout = maxAckTimeout; - } - - /** {@inheritDoc} */ - @Override public long getJoinTimeout() { - return joinTimeout; - } - - /** - * Sets join timeout. - * <p> - * If non-shared IP finder is used and node fails to connect to - * any address from IP finder, node keeps trying to join within this - * timeout. If all addresses are still unresponsive, exception is thrown - * and node startup fails. - * <p> - * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. - * - * @param joinTimeout Join timeout ({@code 0} means wait forever). - * - * @see org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared() - */ - @IgniteSpiConfiguration(optional = true) - public void setJoinTimeout(long joinTimeout) { - this.joinTimeout = joinTimeout; - } - - /** {@inheritDoc} */ - @Override public int getLocalPort() { - TcpDiscoveryNode locNode0 = locNode; - - return locNode0 != null ? locNode0.discoveryPort() : 0; - } - - /** - * Sets local port to listen to. - * <p> - * If not specified, default is {@link #DFLT_PORT}. - * - * @param locPort Local port to bind. - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalPort(int locPort) { - this.locPort = locPort; - } - - /** {@inheritDoc} */ - @Override public int getLocalPortRange() { - return locPortRange; - } - - /** - * Range for local ports. Local node will try to bind on first available port - * starting from {@link #getLocalPort()} up until - * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>. - * <p> - * If not specified, default is {@link #DFLT_PORT_RANGE}. - * - * @param locPortRange Local port range to bind. - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalPortRange(int locPortRange) { - this.locPortRange = locPortRange; - } - - /** {@inheritDoc} */ - @Override public int getMaxMissedHeartbeats() { - return maxMissedHbs; - } - - /** - * Sets max heartbeats count node can miss without initiating status check. - * <p> - * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}. - * - * @param maxMissedHbs Max missed heartbeats. - */ - @IgniteSpiConfiguration(optional = true) - public void setMaxMissedHeartbeats(int maxMissedHbs) { - this.maxMissedHbs = maxMissedHbs; - } - - /** {@inheritDoc} */ - @Override public int getMaxMissedClientHeartbeats() { - return maxMissedClientHbs; - } - - /** - * Sets max heartbeats count node can miss without failing client node. - * <p> - * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}. - * - * @param maxMissedClientHbs Max missed client heartbeats. - */ - @IgniteSpiConfiguration(optional = true) - public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) { - this.maxMissedClientHbs = maxMissedClientHbs; - } - - /** {@inheritDoc} */ - @Override public long getStatisticsPrintFrequency() { - return statsPrintFreq; - } - - /** - * Sets statistics print frequency. - * <p> - * If not set default value is {@link #DFLT_STATS_PRINT_FREQ}. - * 0 indicates that no print is required. If value is greater than 0 and log is - * not quiet then statistics are printed out with INFO level. - * <p> - * This may be very helpful for tracing topology problems. - * - * @param statsPrintFreq Statistics print frequency in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setStatisticsPrintFrequency(long statsPrintFreq) { - this.statsPrintFreq = statsPrintFreq; - } - - /** {@inheritDoc} */ - @Override public long getIpFinderCleanFrequency() { - return ipFinderCleanFreq; - } - - /** - * Sets IP finder clean frequency in milliseconds. - * <p> - * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ} - * - * @param ipFinderCleanFreq IP finder clean frequency. - */ - @IgniteSpiConfiguration(optional = true) - public void setIpFinderCleanFrequency(long ipFinderCleanFreq) { - this.ipFinderCleanFreq = ipFinderCleanFreq; - } - - /** - * This method is intended for troubleshooting purposes only. - * - * @param debugMode {code True} to start SPI in debug mode. - */ - public void setDebugMode(boolean debugMode) { - this.debugMode = debugMode; - } - - /** - * This method is intended for troubleshooting purposes only. - * - * @param debugMsgHist Message history log size. - */ - public void setDebugMessageHistory(int debugMsgHist) { - this.debugMsgHist = debugMsgHist; - } - - /** {@inheritDoc} */ - @Override public String getSpiState() { - synchronized (mux) { - return spiState.name(); - } - } - - /** {@inheritDoc} */ - @Override public long getSocketTimeout() { - return sockTimeout; - } - - /** {@inheritDoc} */ - @Override public long getAckTimeout() { - return ackTimeout; - } - - /** {@inheritDoc} */ - @Override public long getNetworkTimeout() { - return netTimeout; - } - - /** {@inheritDoc} */ - @Override public int getThreadPriority() { - return threadPri; - } - - /** {@inheritDoc} */ - @Override public long getHeartbeatFrequency() { - return hbFreq; - } - - /** {@inheritDoc} */ - @Override public String getIpFinderFormatted() { - return ipFinder.toString(); - } - - /** {@inheritDoc} */ - @Override public int getMessageWorkerQueueSize() { - return msgWorker.queueSize(); - } - - /** {@inheritDoc} */ - @Override public long getNodesJoined() { - return stats.joinedNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getNodesLeft() { - return stats.leftNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getNodesFailed() { - return stats.failedNodesCount(); - } - - /** {@inheritDoc} */ - @Override public long getPendingMessagesRegistered() { - return stats.pendingMessagesRegistered(); - } - - /** {@inheritDoc} */ - @Override public long getPendingMessagesDiscarded() { - return stats.pendingMessagesDiscarded(); - } - - /** {@inheritDoc} */ - @Override public long getAvgMessageProcessingTime() { - return stats.avgMessageProcessingTime(); - } - - /** {@inheritDoc} */ - @Override public long getMaxMessageProcessingTime() { - return stats.maxMessageProcessingTime(); - } - - /** {@inheritDoc} */ - @Override public int getTotalReceivedMessages() { - return stats.totalReceivedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getReceivedMessages() { - return stats.receivedMessages(); - } - - /** {@inheritDoc} */ - @Override public int getTotalProcessedMessages() { - return stats.totalProcessedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getProcessedMessages() { - return stats.processedMessages(); - } - - /** {@inheritDoc} */ - @Override public long getCoordinatorSinceTimestamp() { - return stats.coordinatorSinceTimestamp(); - } - - /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { - TcpDiscoveryNode crd = resolveCoordinator(); - - return crd != null ? crd.id() : null; - } - - /** {@inheritDoc} */ - @Nullable @Override public ClusterNode getNode(UUID nodeId) { - assert nodeId != null; - - UUID locNodeId0 = locNodeId; - - if (locNodeId0 != null && locNodeId0.equals(nodeId)) - // Return local node directly. - return locNode; - - TcpDiscoveryNode node = ring.node(nodeId); - - if (node != null && !node.visible()) - return null; - - return node; - } - - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> getRemoteNodes() { - return F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleRemoteNodes()); - } - - /** {@inheritDoc} */ - @Override public Collection<Object> injectables() { - Collection<Object> res = new LinkedList<>(); - - if (ipFinder != null) - res.add(ipFinder); - - return res; - } - - /** {@inheritDoc} */ - @Override public void spiStart(String gridName) throws IgniteSpiException { - spiStart0(false); - } - - /** - * Starts or restarts SPI after stop (to reconnect). - * - * @param restart {@code True} if SPI is restarted after stop. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private void spiStart0(boolean restart) throws IgniteSpiException { - if (!restart) - // It is initial start. - onSpiStart(); - - synchronized (mux) { - spiState = DISCONNECTED; - } - - if (debugMode) { - if (!log.isInfoEnabled()) - throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + - "in debug mode."); - - debugLog = new ConcurrentLinkedDeque<>(); - - U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); - } - - // Clear addresses collections. - fromAddrs.clear(); - noResAddrs.clear(); - - sockTimeoutWorker = new SocketTimeoutWorker(); - sockTimeoutWorker.start(); - - msgWorker = new RingMessageWorker(); - msgWorker.start(); - - tcpSrvr = new TcpServer(); - - // Init local node. - IgniteBiTuple<Collection<String>, Collection<String>> addrs; - - try { - addrs = U.resolveLocalAddresses(locHost); - } - catch (IOException | GridException e) { - throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); - } - - locNode = new TcpDiscoveryNode( - locNodeId, - addrs.get1(), - addrs.get2(), - tcpSrvr.port, - metricsProvider, - locNodeVer); - - try { - Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : - U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), - locNode.discoveryPort()); - - if (extAddrs != null) - locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); - } - catch (GridException e) { - throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e); - } - - locNode.setAttributes(locNodeAttrs); - - locNode.local(true); - - locNodeAddrs = getNodeAddresses(locNode); - - if (log.isDebugEnabled()) - log.debug("Local node initialized: " + locNode); - - // Start TCP server thread after local node is initialized. - tcpSrvr.start(); - - ring.localNode(locNode); - - if (ipFinder.isShared()) - registerLocalNodeAddress(); - else { - if (F.isEmpty(ipFinder.getRegisteredAddresses())) - throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + - "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + - "(specify list of IP addresses in configuration)."); - - ipFinderHasLocAddr = ipFinderHasLocalAddress(); - } - - if (statsPrintFreq > 0 && log.isInfoEnabled()) { - statsPrinter = new StatisticsPrinter(); - statsPrinter.start(); - } - - stats.onJoinStarted(); - - joinTopology(); - - stats.onJoinFinished(); - - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); - - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); - - if (ipFinder.isShared()) { - ipFinderCleaner = new IpFinderCleaner(); - ipFinderCleaner.start(); - } - - if (log.isDebugEnabled() && !restart) - log.debug(startInfo()); - - if (restart) - getSpiContext().registerPort(tcpSrvr.port, TCP); - } - - /** - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - @SuppressWarnings("BusyWait") - private void registerLocalNodeAddress() throws IgniteSpiException { - // Make sure address registration succeeded. - while (true) { - try { - ipFinder.initializeLocalAddresses(locNode.socketAddresses()); - - // Success. - break; - } - catch (IllegalStateException e) { - throw new IgniteSpiException("Failed to register local node address with IP finder: " + - locNode.socketAddresses(), e); - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to register local node address in IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - } - - /** - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private void onSpiStart() throws IgniteSpiException { - startStopwatch(); - - assertParameter(ipFinder != null, "ipFinder != null"); - assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); - assertParameter(locPort > 1023, "localPort > 1023"); - assertParameter(locPortRange >= 0, "localPortRange >= 0"); - assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); - assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); - assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); - assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); - assertParameter(threadPri > 0, "threadPri > 0"); - assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0"); - - try { - locHost = U.resolveLocalHost(locAddr); - } - catch (IOException e) { - throw new IgniteSpiException("Unknown local address: " + locAddr, e); - } - - if (log.isDebugEnabled()) { - log.debug(configInfo("localHost", locHost.getHostAddress())); - log.debug(configInfo("localPort", locPort)); - log.debug(configInfo("localPortRange", locPortRange)); - log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("maxAckTimeout", maxAckTimeout)); - log.debug(configInfo("reconnectCount", reconCnt)); - log.debug(configInfo("ipFinder", ipFinder)); - log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); - log.debug(configInfo("heartbeatFreq", hbFreq)); - log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs)); - log.debug(configInfo("statsPrintFreq", statsPrintFreq)); - } - - // Warn on odd network timeout. - if (netTimeout < 3000) - U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); - - // Warn on odd heartbeat frequency. - if (hbFreq < 2000) - U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq); - - registerMBean(gridName, this, TcpDiscoverySpiMBean.class); - - if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { - TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); - - if (mcastIpFinder.getLocalAddress() == null) - mcastIpFinder.setLocalAddress(locAddr); - } - } - - /** {@inheritDoc} */ - @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ctxInitLatch.countDown(); - - spiCtx.registerPort(tcpSrvr.port, TCP); - } - - /** {@inheritDoc} */ - @Override public IgniteSpiContext getSpiContext() { - if (ctxInitLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Waiting for context initialization."); - - try { - U.await(ctxInitLatch); - - if (log.isDebugEnabled()) - log.debug("Context has been initialized."); - } - catch (GridInterruptedException e) { - U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e); - } - } - - return super.getSpiContext(); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - spiStop0(false); - } - - /** - * Stops SPI finally or stops SPI for restart. - * - * @param disconnect {@code True} if SPI is being disconnected. - * @throws org.apache.ignite.spi.IgniteSpiException If failed. - */ - private void spiStop0(boolean disconnect) throws IgniteSpiException { - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); - - if (log.isDebugEnabled()) { - if (disconnect) - log.debug("Disconnecting SPI."); - else - log.debug("Preparing to start local node stop procedure."); - } - - if (disconnect) { - synchronized (mux) { - spiState = DISCONNECTING; - } - } - - if (msgWorker != null && msgWorker.isAlive() && !disconnect) { - // Send node left message only if it is final stop. - msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNodeId)); - - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; - - long timeout = netTimeout; - - while (spiState != LEFT && timeout > 0) { - try { - mux.wait(timeout); - - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - - break; - } - } - - if (spiState == LEFT) { - if (log.isDebugEnabled()) - log.debug("Verification for local node leave has been received from coordinator" + - " (continuing stop procedure)."); - } - else if (log.isInfoEnabled()) { - log.info("No verification for local node leave has been received from coordinator" + - " (will stop node anyway)."); - } - } - } - - U.interrupt(tcpSrvr); - U.join(tcpSrvr, log); - - Collection<SocketReader> tmp; - - synchronized (mux) { - tmp = U.arrayList(readers); - } - - U.interrupt(tmp); - U.joinThreads(tmp, log); - - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); - - U.interrupt(msgWorker); - U.join(msgWorker, log); - - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); - - U.interrupt(statsPrinter); - U.join(statsPrinter, log); - - if (ipFinder != null) - ipFinder.close(); - - Collection<TcpDiscoveryNode> rmts = null; - - if (!disconnect) { - // This is final stop. - unregisterMBean(); - - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - else { - getSpiContext().deregisterPorts(); - - rmts = ring.visibleRemoteNodes(); - } - - long topVer = ring.topologyVersion(); - - ring.clear(); - - if (rmts != null && !rmts.isEmpty()) { - // This is restart/disconnection and remote nodes are not empty. - // We need to fire FAIL event for each. - DiscoverySpiListener lsnr = this.lsnr; - - if (lsnr != null) { - Collection<ClusterNode> processed = new LinkedList<>(); - - for (TcpDiscoveryNode n : rmts) { - assert n.visible(); - - processed.add(n); - - Collection<ClusterNode> top = F.viewReadOnly(rmts, F.<ClusterNode>identity(), F.notIn(processed)); - - topVer++; - - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - - lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist); - } - } - } - - printStatistics(); - - stats.clear(); - - synchronized (mux) { - // Clear stored data. - leavingNodes.clear(); - failedNodes.clear(); - - spiState = DISCONNECTED; - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); - - getSpiContext().deregisterPorts(); - } - - /** - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - * @return {@code true} if IP finder contains local address. - */ - private boolean ipFinderHasLocalAddress() throws IgniteSpiException { - for (InetSocketAddress locAddr : locNodeAddrs) { - for (InetSocketAddress addr : registeredAddresses()) - try { - int port = addr.getPort(); - - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) : - new InetSocketAddress(addr.getAddress(), port); - - if (resolved.equals(locAddr)) - return true; - } - catch (UnknownHostException ignored) { - // No-op. - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - assert nodeId != null; - - if (nodeId == locNodeId) - return true; - - TcpDiscoveryNode node = ring.node(nodeId); - - if (node == null || !node.visible()) - return false; - - boolean res = pingNode(node); - - if (!res && !node.isClient()) { - LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); - } - - return res; - } - - /** - * Pings the remote node to see if it's alive. - * - * @param node Node. - * @return {@code True} if ping succeeds. - */ - private boolean pingNode(TcpDiscoveryNode node) { - assert node != null; - - if (node.id().equals(locNodeId)) - return true; - - UUID clientNodeId = null; - - if (node.isClient()) { - clientNodeId = node.id(); - - node = ring.node(node.clientRouterNodeId()); - - if (node == null || !node.visible()) - return false; - } - - for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) { - try { - // ID returned by the node should be the same as ID of the parameter for ping to succeed. - IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); - - return node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); - } - catch (GridException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); - - // continue; - } - } - - return false; - } - - /** - * Pings the remote node by its address to see if it's alive. - * - * @param addr Address of the node. - * @return ID of the remote node if node alive. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) - throws GridException { - assert addr != null; - - if (F.contains(locNodeAddrs, addr)) - return F.t(locNodeId, false); - - GridFutureAdapterEx<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapterEx<>(); - - IgniteFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); - - if (oldFut != null) - return oldFut.get(); - else { - Collection<Throwable> errs = null; - - try { - Socket sock = null; - - for (int i = 0; i < reconCnt; i++) { - try { - if (addr.isUnresolved()) - addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); - - long tstamp = U.currentTimeMillis(); - - sock = openSocket(addr); - - writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); - - TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout); - - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Ping response from local node: " + res); - - break; - } - - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); - - IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); - - fut.onDone(t); - - return t; - } - catch (IOException | GridException e) { - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - finally { - U.closeQuiet(sock); - } - } - } - catch (Throwable t) { - fut.onDone(t); - - throw U.cast(t); - } - finally { - if (!fut.isDone()) - fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs)); - - boolean b = pingMap.remove(addr, fut); - - assert b; - } - - return fut.get(); - } - } - - /** {@inheritDoc} */ - @Override public void disconnect() throws IgniteSpiException { - spiStop0(true); - } - - /** {@inheritDoc} */ - @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) { - this.nodeAuth = nodeAuth; - } - - /** - * Tries to join this node to topology. - * - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - */ - private void joinTopology() throws IgniteSpiException { - synchronized (mux) { - assert spiState == CONNECTING || spiState == DISCONNECTED; - - spiState = CONNECTING; - } - - GridSecurityCredentials locCred = (GridSecurityCredentials)locNode.getAttributes() - .get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - // Marshal credentials for backward compatibility and security. - marshalCredentials(locNode); - - while (true) { - if (!sendJoinRequestMessage()) { - if (log.isDebugEnabled()) - log.debug("Join request message has not been sent (local node is the first in the topology)."); - - // Authenticate local node. - try { - GridSecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); - - if (subj == null) - throw new IgniteSpiException("Authentication failed for local node: " + locNode.id()); - - Map<String, Object> attrs = new HashMap<>(locNode.attributes()); - - attrs.put(GridNodeAttributes.ATTR_SECURITY_SUBJECT, gridMarsh.marshal(subj)); - attrs.remove(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - locNode.setAttributes(attrs); - } - catch (GridException e) { - throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); - } - - locNode.order(1); - locNode.internalOrder(1); - - gridStartTime = U.currentTimeMillis(); - - locNode.visible(true); - - ring.clear(); - - ring.topologyVersion(1); - - synchronized (mux) { - topHist.clear(); - - spiState = CONNECTED; - - mux.notifyAll(); - } - - notifyDiscovery(EVT_NODE_JOINED, 1, locNode); - - break; - } - - if (log.isDebugEnabled()) - log.debug("Join request message has been sent (waiting for coordinator response)."); - - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; - - long timeout = netTimeout; - - while (spiState == CONNECTING && timeout > 0) { - try { - mux.wait(timeout); - - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - - throw new IgniteSpiException("Thread has been interrupted."); - } - } - - if (spiState == CONNECTED) - break; - else if (spiState == DUPLICATE_ID) - throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); - else if (spiState == AUTH_FAILED) - throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); - else if (spiState == CHECK_FAILED) - throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); - else if (spiState == LOOPBACK_PROBLEM) { - TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); - - boolean locHostLoopback = locHost.isLoopbackAddress(); - - String firstNode = locHostLoopback ? "local" : "remote"; - - String secondNode = locHostLoopback ? "remote" : "local"; - - throw new IgniteSpiException("Failed to add node to topology because " + firstNode + - " node is configured to use loopback address, but " + secondNode + " node is not " + - "(consider changing 'localAddress' configuration parameter) " + - "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + - U.addressesAsString(msg.addresses(), msg.hostNames()) + ']'); - } - else - LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + - "Check remote nodes logs for possible error messages. " + - "Note that large topology may require significant time to start. " + - "Increase 'GridTcpDiscoverySpi.networkTimeout' configuration property " + - "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']'); - } - } - - assert locNode.order() != 0; - assert locNode.internalOrder() != 0; - - if (log.isDebugEnabled()) - log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); - } - - /** - * @param msg Error message. - * @return Remote grid version parsed from error message. - * @deprecated This method was created for preserving backward compatibility. During major version update - * parsing of error message should be replaced with new {@link org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage} - * which contains all necessary information. - */ - @Deprecated - @Nullable private String parseRemoteVersion(String msg) { - msg = msg.replaceAll("\\s", ""); - - final String verPrefix = "rmtBuildVer="; - - int startIdx = msg.indexOf(verPrefix); - int endIdx = msg.indexOf(',', startIdx); - - if (endIdx < 0) - endIdx = msg.indexOf(']', startIdx); - - if (startIdx < 0 || endIdx < 0) - return null; - - return msg.substring(startIdx + verPrefix.length() - 1, endIdx); - } - - /** - * Tries to send join request message to a random node presenting in topology. - * Address is provided by {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is - * sent to first node connection succeeded to. - * - * @return {@code true} if send succeeded. - * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs. - */ - @SuppressWarnings({"BusyWait"}) - private boolean sendJoinRequestMessage() throws IgniteSpiException { - TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - exchange.collect(locNodeId)); - - // Time when it has been detected, that addresses from IP finder do not respond. - long noResStart = 0; - - while (true) { - Collection<InetSocketAddress> addrs = resolvedAddresses(); - - if (F.isEmpty(addrs)) - return false; - - boolean retry = false; - GridException errs = null; - - for (InetSocketAddress addr : addrs) { - try { - Integer res = sendMessageDirectly(joinReq, addr); - - assert res != null; - - noResAddrs.remove(addr); - - // Address is responsive, reset period start. - noResStart = 0; - - switch (res) { - case RES_WAIT: - // Concurrent startup, try sending join request again or wait if no success. - retry = true; - - break; - case RES_OK: - if (log.isDebugEnabled()) - log.debug("Join request message has been sent to address [addr=" + addr + - ", req=" + joinReq + ']'); - - // Join request sending succeeded, wait for response from topology. - return true; - - default: - // Concurrent startup, try next node. - if (res == RES_CONTINUE_JOIN) { - if (!fromAddrs.contains(addr)) - retry = true; - } - else { - if (log.isDebugEnabled()) - log.debug("Unexpected response to join request: " + res); - - retry = true; - } - - break; - } - } - catch (IgniteSpiException e) { - if (errs == null) - errs = new GridException("Multiple connection attempts failed."); - - errs.addSuppressed(e); - - if (log.isDebugEnabled()) { - IOException ioe = X.cause(e, IOException.class); - - log.debug("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']'); - } - - noResAddrs.add(addr); - } - } - - if (retry) { - if (log.isDebugEnabled()) - log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); - - try { - U.sleep(2000); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - else if (!ipFinder.isShared() && !ipFinderHasLocAddr) { - if (errs != null && X.hasCause(errs, ConnectException.class)) - LT.warn(log, null, "Failed to connect to any address from IP finder " + - "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + - addrs); - - if (joinTimeout > 0) { - if (noResStart == 0) - noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > joinTimeout) - throw new IgniteSpiException( - "Failed to connect to any address from IP finder within join timeout " + - "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + - "on all host machines, or consider increasing 'joinTimeout' configuration property): " + - addrs, errs); - } - - try { - U.sleep(2000); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - else - break; - } - - return false; - } - - /** - * Establishes connection to an address, sends message and returns the response (if any). - * - * @param msg Message to send. - * @param addr Address to send message to. - * @return Response read from the recipient or {@code null} if no response is supposed. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr) - throws IgniteSpiException { - assert msg != null; - assert addr != null; - - Collection<Throwable> errs = null; - - Socket sock = null; - - long ackTimeout0 = ackTimeout; - - int connectAttempts = 1; - - boolean joinReqSent = false; - - for (int i = 0; i < reconCnt; i++) { - // Need to set to false on each new iteration, - // since remote node may leave in the middle of the first iteration. - joinReqSent = false; - - boolean openSock = false; - - try { - long tstamp = U.currentTimeMillis(); - - sock = openSocket(addr); - - openSock = true; - - // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); - - TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0); - - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Handshake response from local node: " + res); - - break; - } - - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); - - // Send message. - tstamp = U.currentTimeMillis(); - - writeToSocket(sock, msg); - - stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - - if (debugMode) - debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - if (log.isDebugEnabled()) - log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - // Connection has been established, but - // join request may not be unmarshalled on remote host. - // E.g. due to class not found issue. - joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - - return readReceipt(sock, ackTimeout0); - } - catch (ClassCastException e) { - // This issue is rarely reproducible on AmazonEC2, but never - // on dedicated machines. - if (log.isDebugEnabled()) - U.error(log, "Class cast exception on direct send: " + addr, e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - catch (IOException | GridException e) { - if (log.isDebugEnabled()) - log.error("Exception on direct send: " + e.getMessage(), e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - - if (!openSock) { - // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2) { - connectAttempts++; - - continue; - } - - break; // Don't retry if we can not establish connection. - } - - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { - ackTimeout0 *= 2; - - if (!checkAckTimeout(ackTimeout0)) - break; - } - } - finally { - U.closeQuiet(sock); - } - } - - if (joinReqSent) { - if (log.isDebugEnabled()) - log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); - - // Topology will not include this node, - // however, warning on timed out join will be output. - return RES_OK; - } - - throw new IgniteSpiException( - "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']', - U.exceptionWithSuppressed("Failed to send message to address " + - "[addr=" + addr + ", msg=" + msg + ']', errs)); - } - - /** - * Marshalls credentials with discovery SPI marshaller (will replace attribute value). - * - * @param node Node to marshall credentials for. - * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed. - */ - private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - // Use security-unsafe getter. - Map<String, Object> attrs = new HashMap<>(node.getAttributes()); - - attrs.put(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS, - marsh.marshal(attrs.get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS))); - - node.setAttributes(attrs); - } - catch (GridException e) { - throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); - } - } - - /** - * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value). - * - * @param node Node to unmarshall credentials for. - * @return Security credentials. - * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails. - */ - private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - byte[] credBytes = (byte[])node.getAttributes().get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - if (credBytes == null) - return null; - - return marsh.unmarshal(credBytes, null); - } - catch (GridException e) { - throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); - } - } - - /** - * @param ackTimeout Acknowledgement timeout. - * @return {@code True} if acknowledgement timeout is less or equal to - * maximum acknowledgement timeout, {@code false} otherwise. - */ - private boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > maxAckTimeout) { - LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + - "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']'); - - return false; - } - - return true; - } - - /** - * Notify external listener on discovery event. - * - * @param type Discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. - */ - private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { - assert type > 0; - assert node != null; - - DiscoverySpiListener lsnr = this.lsnr; - - TcpDiscoverySpiState spiState = spiStateCopy(); - - if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - - Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); - - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - - lsnr.onDiscovery(type, topVer, node, top, hist); - } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - } - - /** - * Update topology history with new topology snapshots. - * - * @param topVer Topology version. - * @param top Topology snapshot. - * @return Copy of updated topology history. - */ - @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) { - synchronized (mux) { - if (topHist.containsKey(topVer)) - return null; - - topHist.put(topVer, top); - - while (topHist.size() > topHistSize) - topHist.remove(topHist.firstKey()); - - if (log.isDebugEnabled()) - log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); - - return new TreeMap<>(topHist); - } - } - - /** - * @param node Node. - * @return {@link LinkedHashSet} of internal and external addresses of provided node. - * Internal addresses placed before external addresses. - */ - @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { - LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); - - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - - if (extAddrs != null) - res.addAll(extAddrs); - - return res; - } - - /** - * @param node Node. - * @param sameHost Same host flag. - * @return {@link LinkedHashSet} of internal and external addresses of provided node. - * Internal addresses placed before external addresses. - * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}. - */ - @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) { - List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses()); - - Collections.sort(addrs, U.inetAddressesComparator(sameHost)); - - LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs); - - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - - if (extAddrs != null) - res.addAll(extAddrs); - - return res; - } - - /** - * Checks whether local node is coordinator. Nodes that are leaving or failed - * (but are still in topology) are removed from search. - * - * @return {@code true} if local node is coordinator. - */ - private boolean isLocalNodeCoordinator() { - synchronized (mux) { - boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator()); - - if (crd) - stats.onBecomingCoordinator(); - - return crd; - } - } - - /** - * @return Spi state copy. - */ - private TcpDiscoverySpiState spiStateCopy() { - TcpDiscoverySpiState state; - - synchronized (mux) { - state = spiState; - } - - return state; - } - - /** - * Resolves coordinator. Nodes that are leaving or failed (but are still in - * topology) are removed from search. - * - * @return Coordinator node or {@code null} if there are no coordinator - * (i.e. local node is the last one and is currently stopping). - */ - @Nullable private TcpDiscoveryNode resolveCoordinator() { - return resolveCoordinator(null); - } - - /** - * Resolves coordinator. Nodes that are leaving or failed (but are still in - * topology) are removed from search as well as provided filter. - * - * @param filter Nodes to exclude when resolving coordinator (optional). - * @return Coordinator node or {@code null} if there are no coordinator - * (i.e. local node is the last one and is currently stopping). - */ - @Nullable private TcpDiscoveryNode resolveCoordinator( - @Nullable Collection<TcpDiscoveryNode> filter) { - synchronized (mux) { - Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); - - if (!F.isEmpty(filter)) - excluded = F.concat(false, excluded, filter); - - return ring.coordinator(excluded); - } - } - - /** - * Prints SPI statistics. - */ - private void printStatistics() { - if (log.isInfoEnabled() && statsPrintFreq > 0) { - int failedNodesSize; - int leavingNodesSize; - - synchronized (mux) { - failedNodesSize = failedNodes.size(); - leavingNodesSize = leavingNodes.size(); - } - - Runtime runtime = Runtime.getRuntime(); - - TcpDiscoveryNode coord = resolveCoordinator(); - - log.info("Discovery SPI statistics [statistics=" + stats + ", spiState=" + spiStateCopy() + - ", coord=" + coord + - ", topSize=" + ring.allNodes().size() + - ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + - ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + - ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + - ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + - "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); - } - } - - /** - * @param msg Message to prepare. - * @param destNodeId Destination node ID. - * @param msgs Messages to include. - * @param discardMsgId Discarded message ID. - */ - private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { - assert destNodeId != null; - - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - - TcpDiscoveryNode node = nodeAddedMsg.node(); - - if (node.id().equals(destNodeId)) { - Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); - Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); - - for (TcpDiscoveryNode n0 : allNodes) { - assert n0.internalOrder() != 0 : n0; - - // Skip next node and nodes added after next - // in case this message is resent due to failures/leaves. - // There will be separate messages for nodes with greater - // internal order. - if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) - topToSend.add(n0); - } - - nodeAddedMsg.topology(topToSend); - nodeAddedMsg.messages(msgs, discardMsgId); - - Map<Long, Collection<ClusterNode>> hist; - - synchronized (mux) { - hist = new TreeMap<>(topHist); - } - - nodeAddedMsg.topologyHistory(hist); - } - } - } - - /** - * @param msg Message to clear. - */ - private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - // Nullify topology before registration. - TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - - nodeAddedMsg.topology(null); - nodeAddedMsg.topologyHistory(null); - nodeAddedMsg.messages(null, null); - } - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * Simulates this node failure by stopping service threads. So, node will become - * unresponsive. - * <p> - * This method is intended for test purposes only. - */ - void simulateNodeFailure() { - U.warn(log, "Simulating node failure: " + locNodeId); - - U.interrupt(tcpSrvr); - U.join(tcpSrvr, log); - - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); - - Collection<SocketReader> tmp; - - synchronized (mux) { - tmp = U.arrayList(readers); - } - - U.interrupt(tmp); - U.joinThreads(tmp, log); - - U.interrupt(msgWorker); - U.join(msgWorker, log); - - U.interrupt(statsPrinter); - U.join(statsPrinter, log); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * Simulates situation when next node is still alive but is bypassed - * since it has been excluded from the ring, possibly, due to short time - * network problems. - * <p> - * This method is intended for test purposes only. - */ - void forceNextNodeFailure() { - U.warn(log, "Next node will be forcibly failed (if any)."); - - TcpDiscoveryNode next; - - synchronized (mux) { - next = ring.nextNode(failedNodes); - } - - if (next != null) - msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, next.id(), next.internalOrder())); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * This method is intended for test purposes only. - * - * @param msg Message. - */ - void onBeforeMessageSentAcrossRing(Serializable msg) { - // No-op. - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * This method is intended for test purposes only. - * - * @return Nodes ring. - */ - TcpDiscoveryNodesRing ring() { - return ring; - } - - /** {@inheritDoc} */ - @Override public void dumpDebugInfo() { - dumpDebugInfo(log); - } - - /** - * @param log Logger. - */ - public void dumpDebugInfo(IgniteLogger log) { - if (!debugMode) { - U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " + - "in debug mode, consider setting 'debugMode' configuration property to 'true')."); - - return; - } - - assert log.isInfoEnabled(); - - synchronized (mux) { - StringBuilder b = new StringBuilder(U.nl()); - - b.append(">>>").append(U.nl()); - b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); - b.append(">>>").append(U.nl()); - - b.append("Local node ID: ").append(locNodeId).append(U.nl()).append(U.nl()); - b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); - b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl()); - - b.append("Internal threads: ").append(U.nl()); - - b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(sockTimeoutWorker)).append(U.nl()); - b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); - b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); - - b.append(U.nl()); - - b.append("Socket readers: ").append(U.nl()); - - for (SocketReader rdr : readers) - b.append(" ").append(rdr).append(U.nl()); - - b.append(U.nl()); - - b.append("In-memory log messages: ").append(U.nl()); - - for (String msg : debugLog) - b.append(" ").append(msg).append(U.nl()); - - b.append(U.nl()); - - b.append("Leaving nodes: ").append(U.nl()); - - for (TcpDiscoveryNode node : leavingNodes) - b.append(" ").append(node.id()).append(U.nl()); - - b.append(U.nl()); - - b.append("Failed nodes: ").append(U.nl()); - - for (TcpDiscoveryNode node : failedNodes) - b.append(" ").append(node.id()).append(U.nl()); - - b.append(U.nl()); - - b.append("Stats: ").append(stats).append(U.nl()); - - U.quietAndInfo(log, b.toString()); - } - } - - /** - * @param msg Message. - */ - private void debugLog(String msg) { - assert debugMode; - - String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + - '[' + Thread.currentThread().getName() + "][" + locNodeId + "-" + locNode.internalOrder() + "] " + - msg; - - debugLog.add(msg0); - - int delta = debugLog.size() - debugMsgHist; - - for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) - debugLog.poll(); - } - - /** - * @param msg Message. - * @return {@code True} if recordable in debug mode. - */ - private boolean recordable(TcpDiscoveryAbstractMessage msg) { - return !(msg instanceof TcpDiscoveryHeartbeatMessage) && - !(msg instanceof TcpDiscoveryStatusCheckMessage) && - !(msg instanceof TcpDiscoveryDiscardMessage); - } - - /** - * @param t Thread. - * @return Status as string. - */ - private String threadStatus(Thread t) { - if (t == null) - return "N/A"; - - return t.isAlive() ? "alive" : "dead"; - } - - /** - * Checks if two given {@link GridSecurityPermissionSet} objects contain the same permissions. - * Each permission belongs to one of three groups : cache, task or system. - * - * @param locPerms The first set of permissions. - * @param rmtPerms The second set of permissions. - * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise. - */ - private boolean permissionsEqual(GridSecurityPermissionSet locPerms, GridSecurityPermissionSet rmtPerms) { - boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll()); - - boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) && - F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) && - F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions()); - - return dfltAllowMatch && bothHaveSamePerms; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoverySpi.class, this); - } - - /** - * Thread that sends heartbeats. - */ - private class HeartbeatsSender extends IgniteSpiThread { - /** - * Constructor. - */ - private HeartbeatsSender() { - super(gridName, "tcp-disco-hb-sender", log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - while (!isLocalNodeCoordinator()) - Thread.sleep(1000); - - if (log.isDebugEnabled()) - log.debug("Heartbeats sender has been started."); - - while (!isInterrupted()) { - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping heartbeats sender (SPI is not connected to topology)."); - - return; - } - - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId); - - msg.verify(locNodeId); - - msgWorker.addMessage(msg); - - Thread.sleep(hbFreq); - } - } - } - - /** - * Thread that sends status check messages to next node if local node has not - * been receiving heartbeats ({@link org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage}) - * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} * - * {@link TcpDiscoverySpi#getHeartbeatFrequency()}. - */ - private class CheckStatusSender extends IgniteSpiThread { - /** - * Constructor. - */ - private CheckStatusSender() { - super(gridName, "tcp-disco-status-check-sender", log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Status check sender has been started."); - - // Only 1 heartbeat missing is acceptable. 1 sec is added to avoid false alarm. - long checkTimeout = (long)maxMissedHbs * hbFreq + 1000; - - long lastSent = 0; - - while (!isInterrupted()) { - // 1. Determine timeout. - if (lastSent < locNode.lastUpdateTime()) - lastSent = locNode.lastUpdateTime(); - - long timeout = (lastSent + checkTimeout) - U.currentTimeMillis(); - - if (timeout > 0) - Thread.sleep(timeout); - - // 2. Check if SPI is still connected. - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping status check sender (SPI is not connected to topology)."); - - return; - } - - // 3. Was there an update? - if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) { - if (log.isDebugEnabled()) - log.debug("Skipping status check send " + - "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) + - ", hasRmts=" + ring.hasRemoteNodes() + ']'); - - continue; - } - - // 4. Send status check message. - lastSent = U.currentTimeMillis(); - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); - } - } - } - - /** - * Thread that cleans IP finder and keeps it in the correct state, unregistering - * addresses of the nodes that has left the topology. - * <p> - * This thread should run only on coordinator node and will clean IP finder - * if and only if {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}. - */ - private class IpFinderCleaner extends IgniteSpiThread { - /** - * Constructor. - */ - private IpFinderCleaner() { - super(gridName, "tcp-disco-ip-finder-cleaner", log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("IP finder cleaner has been started."); - - while (!isInterrupted()) { - Thread.sleep(ipFinderCleanFreq); - - if (!isLocalNodeCoordinator()) - continue; - - if (spiStateCopy() != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Stopping IP finder cleaner (SPI is not connected to topology)."); - - return; - } - - if (ipFinder.isShared()) - cleanIpFinder(); - } - } - - /** - * Cleans IP finder. - */ - private void cleanIpFinder() { - assert ipFinder.isShared(); - - try { - // Addresses that belongs to nodes in topology. - Collection<InetSocketAddress> currAddrs = F.flatCollections( - F.viewReadOnly( - ring.allNodes(), - new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() { - @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) { - return !node.isClient() ? getNodeAddresses(node) : - Collections.<InetSocketAddress>emptyList(); - } - } - ) - ); - - // Addresses registered in IP finder. - Collection<InetSocketAddress> regAddrs = registeredAddresses(); - - // Remove all addresses that belong to alive nodes, leave dead-node addresses. - Collection<InetSocketAddress> rmvAddrs = F.view( - regAddrs, - F.notContains(currAddrs), - new P1<InetSocketAddress>() { - private final Map<InetSocketAddress, Boolean> pingResMap = - new HashMap<>(); - - @Override public boolean apply(InetSocketAddress addr) { - Boolean res = pingResMap.get(addr); - - if (res == null) { - try { - res = pingNode(addr, null).get1() != null; - } - catch (GridException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [addr=" + addr + - ", err=" + e.getMessage() + ']'); - - res = false; - } - finally { - pingResMap.put(addr, res); - } - } - - return !res; - } - } - ); - - // Unregister dead-nodes addresses. - if (!rmvAddrs.isEmpty()) { - ipFinder.unregisterAddresses(rmvAddrs); - - if (log.isDebugEnabled()) - log.debug("Unregistered addresses from IP finder: " + rmvAddrs); - } - - // Addresses that were removed by mistake (e.g. on segmentation). - Collection<InetSocketAddress> missingAddrs = F.view( - currAddrs, - F.notContains(regAddrs) - ); - - // Re-register missing addresses. - if (!missingAddrs.isEmpty()) { - ipFinder.registerAddresses(missingAddrs); - - if (log.isDebugEnabled()) - log.debug("Registered missing addresses in IP finder: " + missingAddrs); - } - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to clean IP finder up."); - } - } - } - - /** - * Pending messages container. - */ - private static class PendingMessages { - /** */ - private static final int MAX = 1024; - - /** Pending messages. */ - private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); - - /** Discarded message ID. */ - private IgniteUuid discardId; - - /** - * Adds pending message and shrinks queue if it exceeds limit - * (messages that were not discarded yet are never removed). - * - * @param msg Message to add. - */ - void add(TcpDiscoveryAbstractMessage msg) { - msgs.add(msg); - - while (msgs.size() > MAX) { - TcpDiscoveryAbstractMessage polled = msgs.poll(); - - assert polled != null; - - if (polled.id().equals(discardId)) - break; - } - } - - /** - * Gets messages starting from provided ID (exclusive). If such - * message is not found, {@code null} is returned (this indicates - * a failure condition when it was already removed from queue). - * - * @param lastMsgId Last message ID. - * @return Collection of messages. - */ - @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) { - assert lastMsgId != null; - - Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size()); - - boolean skip = true; - - for (TcpDiscoveryAbstractMessage msg : msgs) { - if (skip) { - if (msg.id().equals(lastMsgId)) - skip = false; - } - else - copy.add(msg); - } - - return !skip ? copy : null; - } - - /** - * Resets pending messages. - * - * @param msgs Message. - * @param discardId Discarded message ID. - */ - void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) { - this.msgs.clear(); - - if (msgs != null) - this.msgs.addAll(msgs); - - this.discardId = discardId; - } - - /** - * Clears pending messages. - */ - void clear() { - msgs.clear(); - - discardId = null; - } - - /** - * Discards message with provided ID and all before it. - * - * @param id Discarded message ID. - */ - void discard(IgniteUuid id) { - discardId = id; - } - } - - /** - * Message worker thread for mess
<TRUNCATED>