http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 8ceac1c..2b2c691 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -18,22 +18,17 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.events.*; -import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.security.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.*; @@ -45,22 +40,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import java.io.*; import java.net.*; -import java.text.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; -import static org.apache.ignite.spi.IgnitePortProtocol.*; -import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; -import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; -import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; - /** * Discovery SPI implementation that uses TCP/IP for node discovery. * <p> @@ -150,10 +136,43 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean { +public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { + /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ + public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; + /** Default local port range (value is <tt>100</tt>). */ public static final int DFLT_PORT_RANGE = 100; + /** Default port to listen (value is <tt>47500</tt>). */ + public static final int DFLT_PORT = 47500; + + /** Default timeout for joining topology (value is <tt>0</tt>). */ + public static final long DFLT_JOIN_TIMEOUT = 0; + + /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 5000; + + /** Default value for thread priority (value is <tt>10</tt>). */ + public static final int DFLT_THREAD_PRI = 10; + + /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */ + public static final long DFLT_HEARTBEAT_FREQ = 100; + + /** Default size of topology snapshots history. */ + public static final int DFLT_TOP_HISTORY_SIZE = 1000; + + /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT = 200; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 50; + + /** Default socket operations timeout in milliseconds (value is <tt>700ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT_CLIENT = 700; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>700ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT_CLIENT = 700; + /** Default reconnect attempts count (value is <tt>10</tt>). */ public static final int DFLT_RECONNECT_CNT = 10; @@ -172,154 +191,236 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */ public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000; - /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */ - public static final long DFLT_SOCK_TIMEOUT = 200; - - /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */ - public static final long DFLT_ACK_TIMEOUT = 50; - - /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ - public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; + /** Local address. */ + protected String locAddr; /** Address resolver. */ private AddressResolver addrRslvr; - /** Local port which node uses. */ - private int locPort = DFLT_PORT; + /** IP finder. */ + protected TcpDiscoveryIpFinder ipFinder; - /** Local port range. */ - private int locPortRange = DFLT_PORT_RANGE; + /** Socket operations timeout. */ + protected long sockTimeout; // Must be initialized in the constructor of child class. - /** Statistics print frequency. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) - private long statsPrintFreq = DFLT_STATS_PRINT_FREQ; + /** Message acknowledgement timeout. */ + protected long ackTimeout; // Must be initialized in the constructor of child class. - /** Maximum message acknowledgement timeout. */ - private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; + /** Network timeout. */ + protected long netTimeout = DFLT_NETWORK_TIMEOUT; - /** Max heartbeats count node can miss without initiating status check. */ - private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; + /** Join timeout. */ + @SuppressWarnings("RedundantFieldInitialization") + protected long joinTimeout = DFLT_JOIN_TIMEOUT; - /** Max heartbeats count node can miss without failing client node. */ - private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + /** Thread priority for all threads started by SPI. */ + protected int threadPri = DFLT_THREAD_PRI; - /** IP finder clean frequency. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; + /** Heartbeat messages issuing frequency. */ + protected long hbFreq = DFLT_HEARTBEAT_FREQ; - /** Reconnect attempts count. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private int reconCnt = DFLT_RECONNECT_CNT; + /** Size of topology snapshots history. */ + protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - /** */ - private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); + /** Grid discovery listener. */ + protected volatile DiscoverySpiListener lsnr; - /** Nodes ring. */ - @GridToStringExclude - private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); + /** Data exchange. */ + protected DiscoverySpiDataExchange exchange; + + /** Metrics provider. */ + protected DiscoveryMetricsProvider metricsProvider; - /** Topology snapshots history. */ - private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); + /** Local node attributes. */ + protected Map<String, Object> locNodeAttrs; - /** Socket readers. */ - private final Collection<SocketReader> readers = new LinkedList<>(); + /** Local node version. */ + protected IgniteProductVersion locNodeVer; - /** TCP server for discovery SPI. */ - private TcpServer tcpSrvr; + /** Local node. */ + protected TcpDiscoveryNode locNode; - /** Message worker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RingMessageWorker msgWorker; + /** Local host. */ + protected InetAddress locHost; - /** Client message workers. */ - private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); + /** Internal and external addresses of local node. */ + protected Collection<InetSocketAddress> locNodeAddrs; - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; + /** Socket timeout worker. */ + protected SocketTimeoutWorker sockTimeoutWorker; - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; + /** Start time of the very first grid node. */ + protected volatile long gridStartTime; - /** IP finder cleaner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IpFinderCleaner ipFinderCleaner; + /** Marshaller. */ + protected final Marshaller marsh = new JdkMarshaller(); - /** Statistics printer thread. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private StatisticsPrinter statsPrinter; + /** Statistics. */ + protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); - /** Failed nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + /** Local port which node uses. */ + protected int locPort = DFLT_PORT; + + /** Local port range. */ + protected int locPortRange = DFLT_PORT_RANGE; + + /** Reconnect attempts count. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + protected int reconCnt = DFLT_RECONNECT_CNT; - /** Leaving nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + /** Statistics print frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) + protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ; - /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ - private boolean ipFinderHasLocAddr; + /** Maximum message acknowledgement timeout. */ + protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; - /** Addresses that do not respond during join requests send (for resolving concurrent start). */ - private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>(); + /** Max heartbeats count node can miss without initiating status check. */ + protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; - /** Addresses that incoming join requests send were send from (for resolving concurrent start). */ - private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>(); + /** Max heartbeats count node can miss without failing client node. */ + protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; - /** Response on join request from coordinator (in case of duplicate ID or auth failure). */ - private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1(); + /** IP finder clean frequency. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; /** Context initialization latch. */ @GridToStringExclude private final CountDownLatch ctxInitLatch = new CountDownLatch(1); - /** Node authenticator. */ - private DiscoverySpiNodeAuthenticator nodeAuth; + /** */ + protected final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs = + new CopyOnWriteArrayList<>(); + + /** */ + protected final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs = + new CopyOnWriteArrayList<>(); + + /** Logger. */ + @LoggerResource + protected IgniteLogger log; + + /** */ + protected TcpDiscoveryImpl impl; + + /** */ + private boolean clientMode; + + /** {@inheritDoc} */ + @Override public String getSpiState() { + return impl.getSpiState(); + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return impl.getMessageWorkerQueueSize(); + } - /** Mutex. */ - private final Object mux = new Object(); + /** {@inheritDoc} */ + @Nullable @Override public UUID getCoordinator() { + return impl.getCoordinator(); + } - /** Discovery state. */ - protected TcpDiscoverySpiState spiState = DISCONNECTED; + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return impl.getRemoteNodes(); + } - /** Map with proceeding ping requests. */ - private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = - new ConcurrentHashMap8<>(); + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + return impl.getNode(nodeId); + } - /** Debug mode. */ - private boolean debugMode; + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return impl.pingNode(nodeId); + } - /** Debug messages history. */ - private int debugMsgHist = 512; + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + impl.disconnect(); + } - /** Received messages. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ConcurrentLinkedDeque<String> debugLog; + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { + impl.setAuthenticator(auth); + } - /** */ - private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs = - new CopyOnWriteArrayList<>(); + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + impl.sendCustomEvent(msg); + } - /** */ - private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs = - new CopyOnWriteArrayList<>(); + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + impl.failNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public void dumpDebugInfo() { + impl.dumpDebugInfo(log); + } + + /** {@inheritDoc} */ + @Override public boolean isClientMode() { + return clientMode; + } /** - * Default constructor. + * @param clientMode New client mode. */ - public TcpDiscoverySpi() { - ackTimeout = DFLT_ACK_TIMEOUT; - sockTimeout = DFLT_SOCK_TIMEOUT; + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setClientMode(boolean clientMode) { + if (impl != null) + throw new IllegalStateException("You cannot change mode, TcpDiscoverySpi already started."); + + this.clientMode = clientMode; + + return this; } - /** {@inheritDoc} */ + /** + * Inject resources + * + * @param ignite Ignite. + */ @IgniteInstanceResource - @Override public void injectResources(Ignite ignite) { + @Override protected void injectResources(Ignite ignite) { super.injectResources(ignite); // Inject resource. - if (ignite != null) + if (ignite != null) { + setLocalAddress(ignite.configuration().getLocalHost()); setAddressResolver(ignite.configuration().getAddressResolver()); + } + } + + /** + * Sets local host IP address that discovery SPI uses. + * <p> + * If not provided, by default a first found non-loopback address + * will be used. If there is no non-loopback address available, + * then {@link InetAddress#getLocalHost()} will be used. + * + * @param locAddr IP address. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setLocalAddress(String locAddr) { + // Injection should not override value already set by Spring or user. + if (this.locAddr == null) + this.locAddr = locAddr; + + return this; + } + + /** + * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. + * + * @return local address. + */ + public String getLocalAddress() { + return locAddr; } /** @@ -360,8 +461,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @see #setAckTimeout(long) */ @IgniteSpiConfiguration(optional = true) - public void setReconnectCount(int reconCnt) { + public TcpDiscoverySpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + + return this; } /** {@inheritDoc} */ @@ -382,8 +485,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param maxAckTimeout Maximum acknowledgement timeout. */ @IgniteSpiConfiguration(optional = true) - public void setMaxAckTimeout(long maxAckTimeout) { + public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) { this.maxAckTimeout = maxAckTimeout; + + return this; } /** {@inheritDoc} */ @@ -401,8 +506,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param locPort Local port to bind. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPort(int locPort) { + public TcpDiscoverySpi setLocalPort(int locPort) { this.locPort = locPort; + + return this; } /** {@inheritDoc} */ @@ -420,8 +527,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param locPortRange Local port range to bind. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPortRange(int locPortRange) { + public TcpDiscoverySpi setLocalPortRange(int locPortRange) { this.locPortRange = locPortRange; + + return this; } /** {@inheritDoc} */ @@ -437,8 +546,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param maxMissedHbs Max missed heartbeats. */ @IgniteSpiConfiguration(optional = true) - public void setMaxMissedHeartbeats(int maxMissedHbs) { + public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) { this.maxMissedHbs = maxMissedHbs; + + return this; } /** {@inheritDoc} */ @@ -454,8 +565,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param maxMissedClientHbs Max missed client heartbeats. */ @IgniteSpiConfiguration(optional = true) - public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) { + public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) { this.maxMissedClientHbs = maxMissedClientHbs; + + return this; } /** {@inheritDoc} */ @@ -475,8 +588,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param statsPrintFreq Statistics print frequency in milliseconds. */ @IgniteSpiConfiguration(optional = true) - public void setStatisticsPrintFrequency(long statsPrintFreq) { + public TcpDiscoverySpi setStatisticsPrintFrequency(long statsPrintFreq) { this.statsPrintFreq = statsPrintFreq; + + return this; } /** {@inheritDoc} */ @@ -492,112 +607,187 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param ipFinderCleanFreq IP finder clean frequency. */ @IgniteSpiConfiguration(optional = true) - public void setIpFinderCleanFrequency(long ipFinderCleanFreq) { + public TcpDiscoverySpi setIpFinderCleanFrequency(long ipFinderCleanFreq) { this.ipFinderCleanFreq = ipFinderCleanFreq; + + return this; } /** - * This method is intended for troubleshooting purposes only. + * Gets IP finder for IP addresses sharing and storing. * - * @param debugMode {code True} to start SPI in debug mode. + * @return IP finder for IP addresses sharing and storing. */ - public void setDebugMode(boolean debugMode) { - this.debugMode = debugMode; + public TcpDiscoveryIpFinder getIpFinder() { + return ipFinder; } /** - * This method is intended for troubleshooting purposes only. + * Sets IP finder for IP addresses sharing and storing. + * <p> + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. * - * @param debugMsgHist Message history log size. + * @param ipFinder IP finder. */ - public void setDebugMessageHistory(int debugMsgHist) { - this.debugMsgHist = debugMsgHist; + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setIpFinder(TcpDiscoveryIpFinder ipFinder) { + this.ipFinder = ipFinder; + + return this; } - /** {@inheritDoc} */ - @Override public String getSpiState() { - synchronized (mux) { - return spiState.name(); - } + /** + * Sets socket operations timeout. This timeout is used to limit connection time and + * write-to-socket time. + * <p> + * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value + * significantly greater than the default (e.g. to {@code 30000}). + * <p> + * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}. + * + * @param sockTimeout Socket connection timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setSocketTimeout(long sockTimeout) { + this.sockTimeout = sockTimeout; + + return this; } - /** {@inheritDoc} */ - @Override public int getMessageWorkerQueueSize() { - return msgWorker.queueSize(); + /** + * Sets timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. + * <p> + * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}. + * + * @param ackTimeout Acknowledgement timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + + return this; } - /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { - TcpDiscoveryNode crd = resolveCoordinator(); + /** + * Sets maximum network timeout to use for network operations. + * <p> + * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. + * + * @param netTimeout Network timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setNetworkTimeout(long netTimeout) { + this.netTimeout = netTimeout; - return crd != null ? crd.id() : null; + return this; } /** {@inheritDoc} */ - @Nullable @Override public ClusterNode getNode(UUID nodeId) { - assert nodeId != null; - - UUID locNodeId0 = getLocalNodeId(); + @Override public long getJoinTimeout() { + return joinTimeout; + } - if (locNodeId0 != null && locNodeId0.equals(nodeId)) - // Return local node directly. - return locNode; + /** + * Sets join timeout. + * <p> + * If non-shared IP finder is used and node fails to connect to + * any address from IP finder, node keeps trying to join within this + * timeout. If all addresses are still unresponsive, exception is thrown + * and node startup fails. + * <p> + * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. + * + * @param joinTimeout Join timeout ({@code 0} means wait forever). + * + * @see TcpDiscoveryIpFinder#isShared() + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setJoinTimeout(long joinTimeout) { + this.joinTimeout = joinTimeout; - TcpDiscoveryNode node = ring.node(nodeId); + return this; + } - if (node != null && !node.visible()) - return null; + /** + * Sets thread priority. All threads within SPI will be started with it. + * <p> + * If not provided, default value is {@link #DFLT_THREAD_PRI} + * + * @param threadPri Thread priority. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setThreadPriority(int threadPri) { + this.threadPri = threadPri; - return node; + return this; } - /** {@inheritDoc} */ - @Override public Collection<ClusterNode> getRemoteNodes() { - return F.upcast(ring.visibleRemoteNodes()); + /** + * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages + * in configurable time interval to other nodes to notify them about its state. + * <p> + * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. + * + * @param hbFreq Heartbeat frequency in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) { + this.hbFreq = hbFreq; + + return this; } - /** {@inheritDoc} */ - @Override public void spiStart(String gridName) throws IgniteSpiException { - spiStart0(false); + /** + * @return Size of topology snapshots history. + */ + public long getTopHistorySize() { + return topHistSize; } /** - * Starts or restarts SPI after stop (to reconnect). + * Sets size of topology snapshots history. Specified size should be greater than or equal to default size + * {@link #DFLT_TOP_HISTORY_SIZE}. * - * @param restart {@code True} if SPI is restarted after stop. - * @throws IgniteSpiException If failed. + * @param topHistSize Size of topology snapshots history. */ - private void spiStart0(boolean restart) throws IgniteSpiException { - if (!restart) - // It is initial start. - onSpiStart(); + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setTopHistorySize(int topHistSize) { + if (topHistSize < DFLT_TOP_HISTORY_SIZE) { + U.warn(log, "Topology history size should be greater than or equal to default size. " + + "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + + ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - synchronized (mux) { - spiState = DISCONNECTED; + return this; } - if (debugMode) { - if (!log.isInfoEnabled()) - throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + - "in debug mode."); + this.topHistSize = topHistSize; - debugLog = new ConcurrentLinkedDeque<>(); - - U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); - } + return this; + } - // Clear addresses collections. - fromAddrs.clear(); - noResAddrs.clear(); + /** {@inheritDoc} */ + @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + assert locNodeAttrs == null; + assert locNodeVer == null; - sockTimeoutWorker = new SocketTimeoutWorker(); - sockTimeoutWorker.start(); + if (log.isDebugEnabled()) { + log.debug("Node attributes to set: " + attrs); + log.debug("Node version to set: " + ver); + } - msgWorker = new RingMessageWorker(); - msgWorker.start(); + locNodeAttrs = attrs; + locNodeVer = ver; - tcpSrvr = new TcpServer(); + return this; + } + /** + * @param srvPort Server port. + */ + void initLocalNode(int srvPort, boolean addExtAddrAttr) { // Init local node. IgniteBiTuple<Collection<String>, Collection<String>> addrs; @@ -612,166 +802,190 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov getLocalNodeId(), addrs.get1(), addrs.get2(), - tcpSrvr.port, + srvPort, metricsProvider, locNodeVer); - Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : - U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), - locNode.discoveryPort()); + if (addExtAddrAttr) { + Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : + U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), + locNode.discoveryPort()); - if (extAddrs != null) - locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); + locNodeAddrs = new LinkedHashSet<>(); + locNodeAddrs.addAll(locNode.socketAddresses()); - locNode.setAttributes(locNodeAttrs); + if (extAddrs != null) { + locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); - locNode.local(true); + locNodeAddrs.addAll(extAddrs); + } + } - locNodeAddrs = getNodeAddresses(locNode); + locNode.setAttributes(locNodeAttrs); + locNode.local(true); if (log.isDebugEnabled()) log.debug("Local node initialized: " + locNode); + } - // Start TCP server thread after local node is initialized. - tcpSrvr.start(); - - ring.localNode(locNode); + /** + * @param node Node. + * @return {@link LinkedHashSet} of internal and external addresses of provided node. + * Internal addresses placed before external addresses. + */ + @SuppressWarnings("TypeMayBeWeakened") + LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { + LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); - if (ipFinder.isShared()) - registerLocalNodeAddress(); - else { - if (F.isEmpty(ipFinder.getRegisteredAddresses())) - throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + - "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + - "(specify list of IP addresses in configuration)."); + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - ipFinderHasLocAddr = ipFinderHasLocalAddress(); - } + if (extAddrs != null) + res.addAll(extAddrs); - if (statsPrintFreq > 0 && log.isInfoEnabled()) { - statsPrinter = new StatisticsPrinter(); - statsPrinter.start(); - } + return res; + } - stats.onJoinStarted(); + /** + * @param node Node. + * @param sameHost Same host flag. + * @return {@link LinkedHashSet} of internal and external addresses of provided node. + * Internal addresses placed before external addresses. + * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}. + */ + @SuppressWarnings("TypeMayBeWeakened") + LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) { + List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses()); - joinTopology(); + Collections.sort(addrs, U.inetAddressesComparator(sameHost)); - stats.onJoinFinished(); + LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs); - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); + Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); + if (extAddrs != null) + res.addAll(extAddrs); - if (ipFinder.isShared()) { - ipFinderCleaner = new IpFinderCleaner(); - ipFinderCleaner.start(); - } + return res; + } - if (log.isDebugEnabled() && !restart) - log.debug(startInfo()); + /** {@inheritDoc} */ + @Override public Collection<Object> injectables() { + return F.<Object>asList(ipFinder); + } - if (restart) - getSpiContext().registerPort(tcpSrvr.port, TCP); + /** {@inheritDoc} */ + @Override public long getSocketTimeout() { + return sockTimeout; } - /** - * @throws IgniteSpiException If failed. - */ - @SuppressWarnings("BusyWait") - private void registerLocalNodeAddress() throws IgniteSpiException { - // Make sure address registration succeeded. - while (true) { - try { - ipFinder.initializeLocalAddresses(locNode.socketAddresses()); + /** {@inheritDoc} */ + @Override public long getAckTimeout() { + return ackTimeout; + } - // Success. - break; - } - catch (IllegalStateException e) { - throw new IgniteSpiException("Failed to register local node address with IP finder: " + - locNode.socketAddresses(), e); - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to register local node address in IP finder on start " + - "(retrying every 2000 ms)."); - } + /** {@inheritDoc} */ + @Override public long getNetworkTimeout() { + return netTimeout; + } - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } + /** {@inheritDoc} */ + @Override public int getThreadPriority() { + return threadPri; } - /** - * @throws IgniteSpiException If failed. - */ - private void onSpiStart() throws IgniteSpiException { - startStopwatch(); + /** {@inheritDoc} */ + @Override public long getHeartbeatFrequency() { + return hbFreq; + } - checkParameters(); + /** {@inheritDoc} */ + @Override public String getIpFinderFormatted() { + return ipFinder.toString(); + } - assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); - assertParameter(locPort > 1023, "localPort > 1023"); - assertParameter(locPortRange >= 0, "localPortRange >= 0"); - assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); - assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); - assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); - assertParameter(threadPri > 0, "threadPri > 0"); - assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0"); + /** {@inheritDoc} */ + @Override public long getNodesJoined() { + return stats.joinedNodesCount(); + } - try { - locHost = U.resolveLocalHost(locAddr); - } - catch (IOException e) { - throw new IgniteSpiException("Unknown local address: " + locAddr, e); - } + /** {@inheritDoc} */ + @Override public long getNodesLeft() { + return stats.leftNodesCount(); + } - if (log.isDebugEnabled()) { - log.debug(configInfo("localHost", locHost.getHostAddress())); - log.debug(configInfo("localPort", locPort)); - log.debug(configInfo("localPortRange", locPortRange)); - log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("maxAckTimeout", maxAckTimeout)); - log.debug(configInfo("reconnectCount", reconCnt)); - log.debug(configInfo("ipFinder", ipFinder)); - log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); - log.debug(configInfo("heartbeatFreq", hbFreq)); - log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs)); - log.debug(configInfo("statsPrintFreq", statsPrintFreq)); - } + /** {@inheritDoc} */ + @Override public long getNodesFailed() { + return stats.failedNodesCount(); + } - // Warn on odd network timeout. - if (netTimeout < 3000) - U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); + /** {@inheritDoc} */ + @Override public long getPendingMessagesRegistered() { + return stats.pendingMessagesRegistered(); + } - registerMBean(gridName, this, TcpDiscoverySpiMBean.class); + /** {@inheritDoc} */ + @Override public long getPendingMessagesDiscarded() { + return stats.pendingMessagesDiscarded(); + } - if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { - TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); + /** {@inheritDoc} */ + @Override public long getAvgMessageProcessingTime() { + return stats.avgMessageProcessingTime(); + } - if (mcastIpFinder.getLocalAddress() == null) - mcastIpFinder.setLocalAddress(locAddr); - } + /** {@inheritDoc} */ + @Override public long getMaxMessageProcessingTime() { + return stats.maxMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public int getTotalReceivedMessages() { + return stats.totalReceivedMessages(); } /** {@inheritDoc} */ - @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + @Override public Map<String, Integer> getReceivedMessages() { + return stats.receivedMessages(); + } + + /** {@inheritDoc} */ + @Override public int getTotalProcessedMessages() { + return stats.totalProcessedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getProcessedMessages() { + return stats.processedMessages(); + } + + /** {@inheritDoc} */ + @Override public long getCoordinatorSinceTimestamp() { + return stats.coordinatorSinceTimestamp(); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { super.onContextInitialized0(spiCtx); ctxInitLatch.countDown(); - spiCtx.registerPort(tcpSrvr.port, TCP); + ipFinder.onSpiContextInitialized(spiCtx); + + impl.onContextInitialized0(spiCtx); + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + super.onContextDestroyed0(); + + if (ctxInitLatch.getCount() > 0) + // Safety. + ctxInitLatch.countDown(); + + if (ipFinder != null) + ipFinder.onSpiContextDestroyed(); + + getSpiContext().deregisterPorts(); } /** {@inheritDoc} */ @@ -795,4621 +1009,858 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - spiStop0(false); + @Override public ClusterNode getLocalNode() { + return locNode; } - /** - * Stops SPI finally or stops SPI for restart. - * - * @param disconnect {@code True} if SPI is being disconnected. - * @throws IgniteSpiException If failed. - */ - private void spiStop0(boolean disconnect) throws IgniteSpiException { - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + this.lsnr = lsnr; + } - if (log.isDebugEnabled()) { - if (disconnect) - log.debug("Disconnecting SPI."); - else - log.debug("Preparing to start local node stop procedure."); - } + /** {@inheritDoc} */ + @Override public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange) { + this.exchange = exchange; - if (disconnect) { - synchronized (mux) { - spiState = DISCONNECTING; - } - } + return this; + } - if (msgWorker != null && msgWorker.isAlive() && !disconnect) { - // Send node left message only if it is final stop. - msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(getLocalNodeId())); + /** {@inheritDoc} */ + @Override public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { + this.metricsProvider = metricsProvider; - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; + return this; + } - long timeout = netTimeout; + /** {@inheritDoc} */ + @Override public long getGridStartTime() { + assert gridStartTime != 0; - while (spiState != LEFT && timeout > 0) { - try { - mux.wait(timeout); + return gridStartTime; + } - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); + /** + * @param sockAddr Remote address. + * @return Opened socket. + * @throws IOException If failed. + */ + protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + assert sockAddr != null; - break; - } - } + InetSocketAddress resolved = sockAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; - if (spiState == LEFT) { - if (log.isDebugEnabled()) - log.debug("Verification for local node leave has been received from coordinator" + - " (continuing stop procedure)."); - } - else if (log.isInfoEnabled()) { - log.info("No verification for local node leave has been received from coordinator" + - " (will stop node anyway)."); - } - } - } + InetAddress addr = resolved.getAddress(); - U.interrupt(tcpSrvr); - U.join(tcpSrvr, log); + assert addr != null; - Collection<SocketReader> tmp; + Socket sock = new Socket(); - synchronized (mux) { - tmp = U.arrayList(readers); - } + sock.bind(new InetSocketAddress(locHost, 0)); - U.interrupt(tmp); - U.joinThreads(tmp, log); + sock.setTcpNoDelay(true); - U.interrupt(hbsSnd); - U.join(hbsSnd, log); + sock.connect(resolved, (int)sockTimeout); - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); + writeToSocket(sock, U.IGNITE_HEADER); - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); + return sock; + } - U.interrupt(msgWorker); - U.join(msgWorker, log); + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param data Raw data to write. + * @throws IOException If IO failed or write timed out. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, byte[] data) throws IOException { + assert sock != null; + assert data != null; - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - U.interrupt(statsPrinter); - U.join(statsPrinter, log); + sockTimeoutWorker.addTimeoutObject(obj); - if (ipFinder != null) - ipFinder.close(); + IOException err = null; - Collection<TcpDiscoveryNode> rmts = null; + try { + OutputStream out = sock.getOutputStream(); - if (!disconnect) { - // This is final stop. - unregisterMBean(); + out.write(data); - if (log.isDebugEnabled()) - log.debug(stopInfo()); + out.flush(); } - else { - getSpiContext().deregisterPorts(); - - rmts = ring.visibleRemoteNodes(); + catch (IOException e) { + err = e; } + finally { + boolean cancelled = obj.cancel(); - long topVer = ring.topologyVersion(); - - ring.clear(); - - if (rmts != null && !rmts.isEmpty()) { - // This is restart/disconnection and remote nodes are not empty. - // We need to fire FAIL event for each. - DiscoverySpiListener lsnr = this.lsnr; + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); - if (lsnr != null) { - Set<ClusterNode> processed = new HashSet<>(); + // Throw original exception. + if (err != null) + throw err; - for (TcpDiscoveryNode n : rmts) { - assert n.visible(); + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } - processed.add(n); + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed or write timed out. + * @throws IgniteCheckedException If marshalling failed. + */ + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException { + writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. + } - List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed)); + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @param bout Byte array output stream. + * @throws IOException If IO failed or write timed out. + * @throws IgniteCheckedException If marshalling failed. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) + throws IOException, IgniteCheckedException { + assert sock != null; + assert msg != null; + assert bout != null; - topVer++; + // Marshall message first to perform only write after. + marsh.marshal(msg, bout); - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, - Collections.unmodifiableList(top)); + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null); - } - } - } + sockTimeoutWorker.addTimeoutObject(obj); - printStatistics(); + IOException err = null; - stats.clear(); + try { + OutputStream out = sock.getOutputStream(); - synchronized (mux) { - // Clear stored data. - leavingNodes.clear(); - failedNodes.clear(); + bout.writeTo(out); - spiState = DISCONNECTED; + out.flush(); } - } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); + // Throw original exception. + if (err != null) + throw err; - getSpiContext().deregisterPorts(); + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } } /** - * @throws IgniteSpiException If any error occurs. - * @return {@code true} if IP finder contains local address. + * Writes response to the socket. + * + * @param sock Socket. + * @param res Integer response. + * @throws IOException If IO failed or write timed out. */ - private boolean ipFinderHasLocalAddress() throws IgniteSpiException { - for (InetSocketAddress locAddr : locNodeAddrs) { - for (InetSocketAddress addr : registeredAddresses()) - try { - int port = addr.getPort(); - - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) : - new InetSocketAddress(addr.getAddress(), port); - - if (resolved.equals(locAddr)) - return true; - } - catch (UnknownHostException e) { - onException(e.getMessage(), e); - } - } + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, int res) throws IOException { + assert sock != null; - return false; - } + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - assert nodeId != null; + sockTimeoutWorker.addTimeoutObject(obj); - if (log.isDebugEnabled()) - log.debug("Pinging node: " + nodeId + "]."); + OutputStream out = sock.getOutputStream(); - if (nodeId == getLocalNodeId()) - return true; + IOException err = null; - TcpDiscoveryNode node = ring.node(nodeId); + try { + out.write(res); - if (node == null || !node.visible()) - return false; + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); - boolean res = pingNode(node); + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); - if (!res && !node.isClient()) { - LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); + // Throw original exception. + if (err != null) + throw err; - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); } - - return res; } /** - * Pings the remote node to see if it's alive. + * Reads message from the socket limiting read time. * - * @param node Node. - * @return {@code True} if ping succeeds. + * @param sock Socket. + * @param in Input stream (in case socket stream was wrapped). + * @param timeout Socket timeout for this operation. + * @return Message. + * @throws IOException If IO failed or read timed out. + * @throws IgniteCheckedException If unmarshalling failed. */ - private boolean pingNode(TcpDiscoveryNode node) { - assert node != null; - - if (node.id().equals(getLocalNodeId())) - return true; - - UUID clientNodeId = null; + protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { + assert sock != null; - if (node.isClient()) { - clientNodeId = node.id(); + int oldTimeout = sock.getSoTimeout(); - node = ring.node(node.clientRouterNodeId()); + try { + sock.setSoTimeout((int)timeout); - if (node == null || !node.visible()) - return false; + return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); } + catch (IOException | IgniteCheckedException e) { + if (X.hasCause(e, SocketTimeoutException.class)) + LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + + "in long GC pauses on remote node. Current timeout: " + timeout + '.'); - for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) { + throw e; + } + finally { + // Quietly restore timeout. try { - // ID returned by the node should be the same as ID of the parameter for ping to succeed. - IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); - - return node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); + sock.setSoTimeout(oldTimeout); } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); - - onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); - // continue; + catch (SocketException ignored) { + // No-op. } } - - return false; } /** - * Pings the node by its address to see if it's alive. + * Reads message delivery receipt from the socket. * - * @param addr Address of the node. - * @return ID of the remote node and "client exists" flag if node alive. - * @throws IgniteSpiException If an error occurs. + * @param sock Socket. + * @param timeout Socket timeout for this operation. + * @return Receipt. + * @throws IOException If IO failed or read timed out. */ - private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) - throws IgniteCheckedException { - assert addr != null; + protected int readReceipt(Socket sock, long timeout) throws IOException { + assert sock != null; - UUID locNodeId = getLocalNodeId(); + int oldTimeout = sock.getSoTimeout(); + + try { + sock.setSoTimeout((int)timeout); - if (F.contains(locNodeAddrs, addr)) { - if (clientNodeId == null) - return F.t(getLocalNodeId(), false); + int res = sock.getInputStream().read(); - ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId); + if (res == -1) + throw new EOFException(); - if (clientWorker == null) - return F.t(getLocalNodeId(), false); + return res; + } + catch (SocketTimeoutException e) { + LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + + "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + + "configuration property). Will retry to send message with increased timeout. " + + "Current timeout: " + timeout + '.'); - boolean clientPingRes; + stats.onAckTimeout(); + throw e; + } + finally { + // Quietly restore timeout. try { - clientPingRes = clientWorker.ping(); + sock.setSoTimeout(oldTimeout); } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); + catch (SocketException ignored) { + // No-op. } - - return F.t(getLocalNodeId(), clientPingRes); } + } - GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); - - IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); + /** + * Resolves addresses registered in the IP finder, removes duplicates and local host + * address and returns the collection of. + * + * @return Resolved addresses without duplicates and local address (potentially + * empty but never null). + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { + List<InetSocketAddress> res = new ArrayList<>(); - if (oldFut != null) - return oldFut.get(); - else { - Collection<Throwable> errs = null; + Collection<InetSocketAddress> addrs; + // Get consistent addresses collection. + while (true) { try { - Socket sock = null; - - for (int i = 0; i < reconCnt; i++) { - try { - if (addr.isUnresolved()) - addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); + addrs = registeredAddresses(); - long tstamp = U.currentTimeMillis(); - - sock = openSocket(addr); + break; + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to get registered addresses from IP finder on start " + + "(retrying every 2000 ms)."); + } - writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); + try { + U.sleep(2000); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } - TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout); + for (InetSocketAddress addr : addrs) { + assert addr != null; - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Ping response from local node: " + res); + try { + InetSocketAddress resolved = addr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; - break; - } + if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) + res.add(resolved); + } + catch (UnknownHostException ignored) { + LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + // Add address in any case. + res.add(addr); + } + } - IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); + if (!res.isEmpty()) + Collections.shuffle(res); - fut.onDone(t); - - return t; - } - catch (IOException | IgniteCheckedException e) { - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - finally { - U.closeQuiet(sock); - } - } - } - catch (Throwable t) { - fut.onDone(t); - - if (t instanceof Error) - throw t; - - throw U.cast(t); - } - finally { - if (!fut.isDone()) - fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs)); - - boolean b = pingMap.remove(addr, fut); - - assert b; - } - - return fut.get(); - } - } - - /** {@inheritDoc} */ - @Override public void disconnect() throws IgniteSpiException { - spiStop0(true); - } - - /** {@inheritDoc} */ - @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) { - this.nodeAuth = nodeAuth; - } - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { - try { - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt))); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); - } - } - - /** {@inheritDoc} */ - @Override public void failNode(UUID nodeId) { - ClusterNode node = ring.node(nodeId); - - if (node != null) { - TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), - node.id(), node.order()); - - msgWorker.addMessage(msg); - } - } - - /** - * Tries to join this node to topology. - * - * @throws IgniteSpiException If any error occurs. - */ - private void joinTopology() throws IgniteSpiException { - synchronized (mux) { - assert spiState == CONNECTING || spiState == DISCONNECTED; - - spiState = CONNECTING; - } - - SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes() - .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - // Marshal credentials for backward compatibility and security. - marshalCredentials(locNode); - - while (true) { - if (!sendJoinRequestMessage()) { - if (log.isDebugEnabled()) - log.debug("Join request message has not been sent (local node is the first in the topology)."); - - if (nodeAuth != null) { - // Authenticate local node. - try { - SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); - - if (subj == null) - throw new IgniteSpiException("Authentication failed for local node: " + locNode.id()); - - Map<String, Object> attrs = new HashMap<>(locNode.attributes()); - - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, - ignite.configuration().getMarshaller().marshal(subj)); - attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - locNode.setAttributes(attrs); - } - catch (IgniteException | IgniteCheckedException e) { - throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); - } - } - - locNode.order(1); - locNode.internalOrder(1); - - gridStartTime = U.currentTimeMillis(); - - locNode.visible(true); - - ring.clear(); - - ring.topologyVersion(1); - - synchronized (mux) { - topHist.clear(); - - spiState = CONNECTED; - - mux.notifyAll(); - } - - notifyDiscovery(EVT_NODE_JOINED, 1, locNode); - - break; - } - - if (log.isDebugEnabled()) - log.debug("Join request message has been sent (waiting for coordinator response)."); - - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; - - long timeout = netTimeout; - - while (spiState == CONNECTING && timeout > 0) { - try { - mux.wait(timeout); - - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - - throw new IgniteSpiException("Thread has been interrupted."); - } - } - - if (spiState == CONNECTED) - break; - else if (spiState == DUPLICATE_ID) - throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); - else if (spiState == AUTH_FAILED) - throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); - else if (spiState == CHECK_FAILED) - throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); - else if (spiState == LOOPBACK_PROBLEM) { - TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); - - boolean locHostLoopback = locHost.isLoopbackAddress(); - - String firstNode = locHostLoopback ? "local" : "remote"; - - String secondNode = locHostLoopback ? "remote" : "local"; - - throw new IgniteSpiException("Failed to add node to topology because " + firstNode + - " node is configured to use loopback address, but " + secondNode + " node is not " + - "(consider changing 'localAddress' configuration parameter) " + - "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + - U.addressesAsString(msg.addresses(), msg.hostNames()) + ']'); - } - else - LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + - "Check remote nodes logs for possible error messages. " + - "Note that large topology may require significant time to start. " + - "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + - "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']'); - } - } - - assert locNode.order() != 0; - assert locNode.internalOrder() != 0; - - if (log.isDebugEnabled()) - log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); - } + return res; + } /** - * Tries to send join request message to a random node presenting in topology. - * Address is provided by {@link TcpDiscoveryIpFinder} and message is - * sent to first node connection succeeded to. + * Gets addresses registered in the IP finder, initializes addresses having no + * port (or 0 port) with {@link #DFLT_PORT}. * - * @return {@code true} if send succeeded. - * @throws IgniteSpiException If any error occurs. + * @return Registered addresses. + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. */ - @SuppressWarnings({"BusyWait"}) - private boolean sendJoinRequestMessage() throws IgniteSpiException { - TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - collectExchangeData(getLocalNodeId())); - - // Time when it has been detected, that addresses from IP finder do not respond. - long noResStart = 0; - - while (true) { - Collection<InetSocketAddress> addrs = resolvedAddresses(); - - if (F.isEmpty(addrs)) - return false; + protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { + Collection<InetSocketAddress> res = new ArrayList<>(); - boolean retry = false; - Collection<Exception> errs = new ArrayList<>(); + for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { + if (addr.getPort() == 0) { + // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node. + int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT; - try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) { - GridTuple3<InetSocketAddress, Socket, Exception> tuple; - - while ((tuple = multiConnector.next()) != null) { - InetSocketAddress addr = tuple.get1(); - Socket sock = tuple.get2(); - Exception ex = tuple.get3(); - - if (ex == null) { - assert sock != null; - - try { - Integer res = sendMessageDirectly(joinReq, addr, sock); - - assert res != null; - - noResAddrs.remove(addr); - - // Address is responsive, reset period start. - noResStart = 0; - - switch (res) { - case RES_WAIT: - // Concurrent startup, try sending join request again or wait if no success. - retry = true; - - break; - case RES_OK: - if (log.isDebugEnabled()) - log.debug("Join request message has been sent to address [addr=" + addr + - ", req=" + joinReq + ']'); - - // Join request sending succeeded, wait for response from topology. - return true; - - default: - // Concurrent startup, try next node. - if (res == RES_CONTINUE_JOIN) { - if (!fromAddrs.contains(addr)) - retry = true; - } - else { - if (log.isDebugEnabled()) - log.debug("Unexpected response to join request: " + res); - - retry = true; - } - - break; - } - } - catch (IgniteSpiException e) { - e.printStackTrace(); - - ex = e; - } - } - - if (ex != null) { - errs.add(ex); - - if (log.isDebugEnabled()) { - IOException ioe = X.cause(ex, IOException.class); - - log.debug("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']'); - - onException("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe); - } - - noResAddrs.add(addr); - } - } + addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) : + new InetSocketAddress(addr.getAddress(), port); } - if (retry) { - if (log.isDebugEnabled()) - log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - else if (!ipFinder.isShared() && !ipFinderHasLocAddr) { - IgniteCheckedException e = null; - - if (!errs.isEmpty()) { - e = new IgniteCheckedException("Multiple connection attempts failed."); - - for (Exception err : errs) - e.addSuppressed(err); - } - - if (e != null && X.hasCause(e, ConnectException.class)) - LT.warn(log, null, "Failed to connect to any address from IP finder " + - "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + - addrs); - - if (joinTimeout > 0) { - if (noResStart == 0) - noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > joinTimeout) - throw new IgniteSpiException( - "Failed to connect to any address from IP finder within join timeout " + - "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + - "on all host machines, or consider increasing 'joinTimeout' configuration property): " + - addrs, e); - } - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException ex) { - throw new IgniteSpiException("Thread has been interrupted.", ex); - } - } - else - break; + res.add(addr); } - return false; + return res; } /** - * Establishes connection to an address, sends message and returns the response (if any). - * - * @param msg Message to send. - * @param addr Address to send message to. - * @return Response read from the recipient or {@code null} if no response is supposed. - * @throws IgniteSpiException If an error occurs. + * @param msg Message. + * @return Error. */ - @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock) - throws IgniteSpiException { + protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) { assert msg != null; - assert addr != null; - - Collection<Throwable> errs = null; - - long ackTimeout0 = ackTimeout; - - int connectAttempts = 1; - - boolean joinReqSent = false; - - UUID locNodeId = getLocalNodeId(); - - for (int i = 0; i < reconCnt; i++) { - // Need to set to false on each new iteration, - // since remote node may leave in the middle of the first iteration. - joinReqSent = false; - - boolean openSock = false; - - try { - long tstamp = U.currentTimeMillis(); - - if (sock == null) - sock = openSocket(addr); - - openSock = true; - - // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); - - TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0); - - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Handshake response from local node: " + res); - - break; - } - - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); - - // Send message. - tstamp = U.currentTimeMillis(); - - writeToSocket(sock, msg); - - stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - - if (debugMode) - debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - if (log.isDebugEnabled()) - log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - // Connection has been established, but - // join request may not be unmarshalled on remote host. - // E.g. due to class not found issue. - joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - - return readReceipt(sock, ackTimeout0); - } - catch (ClassCastException e) { - // This issue is rarely reproducible on AmazonEC2, but never - // on dedicated machines. - if (log.isDebugEnabled()) - U.error(log, "Class cast exception on direct send: " + addr, e); - - onException("Class cast exception on direct send: " + addr, e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.error("Exception on direct send: " + e.getMessage(), e); - - onException("Exception on direct send: " + e.getMessage(), e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - - if (!openSock) { - // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2) { - connectAttempts++; - - continue; - } - - break; // Don't retry if we can not establish connection. - } - - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { - ackTimeout0 *= 2; - - if (!checkAckTimeout(ackTimeout0)) - break; - } - } - finally { - U.closeQuiet(sock); - - sock = null; - } - } - - if (joinReqSent) { - if (log.isDebugEnabled()) - log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); - - // Topology will not include this node, - // however, warning on timed out join will be output. - return RES_OK; - } - throw new IgniteSpiException( - "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']', - U.exceptionWithSuppressed("Failed to send message to address " + - "[addr=" + addr + ", msg=" + msg + ']', errs)); + return new IgniteSpiException("Local node has the same ID as existing node in topology " + + "(fix configuration and restart local node) [localNode=" + locNode + + ", existingNode=" + msg.node() + ']'); } /** - * Marshalls credentials with discovery SPI marshaller (will replace attribute value). - * - * @param node Node to marshall credentials for. - * @throws IgniteSpiException If marshalling failed. + * @param msg Message. + * @return Error. */ - private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - // Use security-unsafe getter. - Map<String, Object> attrs = new HashMap<>(node.getAttributes()); - - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, - marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); + protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) { + assert msg != null; - node.setAttributes(attrs); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); - } + return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" + + msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); } /** - * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value). - * - * @param node Node to unmarshall credentials for. - * @return Security credentials. - * @throws IgniteSpiException If unmarshal fails. + * @param msg Message. + * @return Error. */ - private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - if (credBytes == null) - return null; + protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) { + assert msg != null; - return marsh.unmarshal(credBytes, null); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); - } + return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : + new IgniteSpiException(msg.error()); } /** - * @param ackTimeout Acknowledgement timeout. - * @return {@code True} if acknowledgement timeout is less or equal to - * maximum acknowledgement timeout, {@code false} otherwise. + * @param msg Message. + * @return Whether delivery of the message is ensured. */ - private boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > maxAckTimeout) { - LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + - "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']'); - - return false; - } - - return true; + protected boolean ensured(TcpDiscoveryAbstractMessage msg) { + return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; } /** - * Notify external listener on discovery event. - * - * @param type Discovery event type. See {@link DiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. + * @param msg Failed message. + * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. + * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it + * and create separate message for failed version check with next major release. */ - private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { - assert type > 0; - assert node != null; - - DiscoverySpiListener lsnr = this.lsnr; - - TcpDiscoverySpiState spiState = spiStateCopy(); - - if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - - Collection<ClusterNode> top = F.upcast(ring.visibleNodes()); - - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - - lsnr.onDiscovery(type, topVer, node, top, hist, null); - } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + @Deprecated + private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) { + return msg.error().contains("versions are not compatible"); } /** - * Update topology history with new topology snapshots. - * - * @param topVer Topology version. - * @param top Topology snapshot. - * @return Copy of updated topology history. + * @param nodeId Node ID. + * @return Marshalled exchange data. */ - @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) { - synchronized (mux) { - if (topHist.containsKey(topVer)) - return null; + protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) { + Map<Integer, Serializable> data = exchange.collect(nodeId); - topHist.put(topVer, top); + if (data == null) + return null; - while (topHist.size() > topHistSize) - topHist.remove(topHist.firstKey()); + Map<Integer, byte[]> data0 = U.newHashMap(data.size()); - if (log.isDebugEnabled()) - log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); + for (Map.Entry<Integer, Serializable> entry : data.entrySet()) { + try { + byte[] bytes = marsh.marshal(entry.getValue()); - return new TreeMap<>(topHist); + data0.put(entry.getKey(), bytes); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal discovery data " + + "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); + } } - } - /** - * @param msg Error message. - * @param e Exception. - */ - private void onException(String msg, Exception e){ - getExceptionRegistry().onException(msg, e); + return data0; } /** - * @param node Node. - * @return {@link LinkedHashSet} of internal and external addresses of provided node. - * Internal addresses placed before external addresses. + * @param joiningNodeID Joining node ID. + * @param nodeId Remote node ID for which data is provided. + * @param data Collection of marshalled discovery data objects from different components. + * @param clsLdr Class loader for discovery data unmarshalling. */ - @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { - LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); - - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); + p
<TRUNCATED>