http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index ed0e9dd..e4ef744 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -18,46 +18,34 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.events.*; -import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.security.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import java.io.*; import java.net.*; -import java.text.*; import java.util.*; import java.util.concurrent.*; - -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; -import static org.apache.ignite.spi.IgnitePortProtocol.*; -import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; -import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*; -import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*; +import java.util.concurrent.atomic.*; /** * Discovery SPI implementation that uses TCP/IP for node discovery. @@ -65,6 +53,14 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe * Nodes are organized in ring. So almost all network exchange (except few cases) is * done across it. * <p> + * If node is configured as client node (see {@link IgniteConfiguration#clientMode}) + * TcpDiscoverySpi starts in client mode as well. In this case node does not take its place in the ring, + * but it connects to random node in the ring (IP taken from IP finder configured) and + * use it as a router for discovery traffic. + * Therefore slow client node or its shutdown will not affect whole cluster. If TcpDiscoverySpi + * needs to be started in server mode regardless of {@link IgniteConfiguration#clientMode}, + * {@link #forceSrvMode} should be set to true. + * <p> * At startup SPI tries to send messages to random IP taken from * {@link TcpDiscoveryIpFinder} about self start (stops when send succeeds) * and then this info goes to coordinator. When coordinator processes join request @@ -105,6 +101,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe * <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li> * <li>IP finder clean frequency (see {@link #setIpFinderCleanFrequency(long)})</li> * <li>Statistics print frequency (see {@link #setStatisticsPrintFrequency(long)}</li> + * <li>Force server mode (see {@link #setForceServerMode(boolean)}</li> * </ul> * <h2 class="header">Java Example</h2> * <pre name="code" class="java"> @@ -148,13 +145,43 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean { +public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { + /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ + public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; + /** Default local port range (value is <tt>100</tt>). */ public static final int DFLT_PORT_RANGE = 100; + /** Default port to listen (value is <tt>47500</tt>). */ + public static final int DFLT_PORT = 47500; + /** Default timeout for joining topology (value is <tt>0</tt>). */ public static final long DFLT_JOIN_TIMEOUT = 0; + /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 5000; + + /** Default value for thread priority (value is <tt>10</tt>). */ + public static final int DFLT_THREAD_PRI = 10; + + /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */ + public static final long DFLT_HEARTBEAT_FREQ = 100; + + /** Default size of topology snapshots history. */ + public static final int DFLT_TOP_HISTORY_SIZE = 1000; + + /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT = 200; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 50; + + /** Default socket operations timeout in milliseconds (value is <tt>700ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT_CLIENT = 700; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>700ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT_CLIENT = 700; + /** Default reconnect attempts count (value is <tt>10</tt>). */ public static final int DFLT_RECONNECT_CNT = 10; @@ -173,133 +200,252 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */ public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000; - /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ - public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; + /** Local address. */ + protected String locAddr; /** Address resolver. */ private AddressResolver addrRslvr; + /** IP finder. */ + protected TcpDiscoveryIpFinder ipFinder; + + /** Socket operations timeout. */ + protected long sockTimeout; // Must be initialized in the constructor of child class. + + /** Message acknowledgement timeout. */ + protected long ackTimeout; // Must be initialized in the constructor of child class. + + /** Network timeout. */ + protected long netTimeout = DFLT_NETWORK_TIMEOUT; + + /** Join timeout. */ + @SuppressWarnings("RedundantFieldInitialization") + protected long joinTimeout = DFLT_JOIN_TIMEOUT; + + /** Thread priority for all threads started by SPI. */ + protected int threadPri = DFLT_THREAD_PRI; + + /** Heartbeat messages issuing frequency. */ + protected long hbFreq = DFLT_HEARTBEAT_FREQ; + + /** Size of topology snapshots history. */ + protected int topHistSize = DFLT_TOP_HISTORY_SIZE; + + /** Grid discovery listener. */ + protected volatile DiscoverySpiListener lsnr; + + /** Data exchange. */ + protected DiscoverySpiDataExchange exchange; + + /** Metrics provider. */ + protected DiscoveryMetricsProvider metricsProvider; + + /** Local node attributes. */ + protected Map<String, Object> locNodeAttrs; + + /** Local node version. */ + protected IgniteProductVersion locNodeVer; + + /** Local node. */ + protected TcpDiscoveryNode locNode; + + /** Local host. */ + protected InetAddress locHost; + + /** Internal and external addresses of local node. */ + protected Collection<InetSocketAddress> locNodeAddrs; + + /** Start time of the very first grid node. */ + protected volatile long gridStartTime; + + /** Marshaller. */ + protected final Marshaller marsh = new JdkMarshaller(); + + /** Statistics. */ + protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); + /** Local port which node uses. */ - private int locPort = DFLT_PORT; + protected int locPort = DFLT_PORT; /** Local port range. */ - private int locPortRange = DFLT_PORT_RANGE; + protected int locPortRange = DFLT_PORT_RANGE; + + /** Reconnect attempts count. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + protected int reconCnt = DFLT_RECONNECT_CNT; /** Statistics print frequency. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) - private long statsPrintFreq = DFLT_STATS_PRINT_FREQ; + protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ; /** Maximum message acknowledgement timeout. */ - private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; - - /** Join timeout. */ - @SuppressWarnings("RedundantFieldInitialization") - private long joinTimeout = DFLT_JOIN_TIMEOUT; + protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; /** Max heartbeats count node can miss without initiating status check. */ - private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; + protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; /** Max heartbeats count node can miss without failing client node. */ - private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; /** IP finder clean frequency. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; + protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; - /** Reconnect attempts count. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private int reconCnt = DFLT_RECONNECT_CNT; + /** Node authenticator. */ + protected DiscoverySpiNodeAuthenticator nodeAuth; - /** Nodes ring. */ + /** Context initialization latch. */ @GridToStringExclude - private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); - - /** Topology snapshots history. */ - private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); - - /** Socket readers. */ - private final Collection<SocketReader> readers = new LinkedList<>(); + private final CountDownLatch ctxInitLatch = new CountDownLatch(1); - /** TCP server for discovery SPI. */ - private TcpServer tcpSrvr; + /** */ + protected final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs = + new CopyOnWriteArrayList<>(); - /** Message worker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private RingMessageWorker msgWorker; + /** */ + protected final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs = + new CopyOnWriteArrayList<>(); - /** Client message workers. */ - private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>(); + /** Logger. */ + @LoggerResource + protected IgniteLogger log; - /** Metrics sender. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HeartbeatsSender hbsSnd; + /** */ + protected TcpDiscoveryImpl impl; - /** Status checker. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private CheckStatusSender chkStatusSnd; + /** */ + private boolean forceSrvMode; - /** IP finder cleaner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IpFinderCleaner ipFinderCleaner; + /** {@inheritDoc} */ + @Override public String getSpiState() { + return impl.getSpiState(); + } - /** Statistics printer thread. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private StatisticsPrinter statsPrinter; + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return impl.getMessageWorkerQueueSize(); + } - /** Failed nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + /** {@inheritDoc} */ + @Nullable @Override public UUID getCoordinator() { + return impl.getCoordinator(); + } - /** Leaving nodes (but still in topology). */ - private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return impl.getRemoteNodes(); + } - /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ - private boolean ipFinderHasLocAddr; + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + return impl.getNode(nodeId); + } - /** Addresses that do not respond during join requests send (for resolving concurrent start). */ - private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>(); + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return impl.pingNode(nodeId); + } - /** Addresses that incoming join requests send were send from (for resolving concurrent start). */ - private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>(); + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + impl.disconnect(); + } - /** Response on join request from coordinator (in case of duplicate ID or auth failure). */ - private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1(); + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { + nodeAuth = auth; + } - /** Context initialization latch. */ - @GridToStringExclude - private final CountDownLatch ctxInitLatch = new CountDownLatch(1); + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + impl.sendCustomEvent(msg); + } - /** Node authenticator. */ - private DiscoverySpiNodeAuthenticator nodeAuth; + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + impl.failNode(nodeId); + } - /** Mutex. */ - private final Object mux = new Object(); + /** {@inheritDoc} */ + @Override public void dumpDebugInfo() { + impl.dumpDebugInfo(log); + } - /** Map with proceeding ping requests. */ - private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = - new ConcurrentHashMap8<>(); + /** {@inheritDoc} */ + @Override public boolean isClientMode() { + if (impl == null) + throw new IllegalStateException("TcpDiscoverySpi has not started"); - /** Debug mode. */ - private boolean debugMode; + return impl instanceof ClientImpl; + } - /** Debug messages history. */ - private int debugMsgHist = 512; + /** + * If {@code true} TcpDiscoverySpi will started in server mode regardless + * of {@link IgniteConfiguration#isClientMode()} + * + * @return forceServerMode flag. + */ + public boolean isForceServerMode() { + return forceSrvMode; + } - /** Received messages. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ConcurrentLinkedDeque<String> debugLog; + /** + * Sets force server mode flag. + * <p> + * If {@code true} TcpDiscoverySpi is started in server mode regardless + * of {@link IgniteConfiguration#isClientMode()}. + * + * @param forceSrvMode forceServerMode flag. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setForceServerMode(boolean forceSrvMode) { + this.forceSrvMode = forceSrvMode; - /** */ - private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs = - new CopyOnWriteArrayList<>(); + return this; + } - /** {@inheritDoc} */ + /** + * Inject resources + * + * @param ignite Ignite. + */ @IgniteInstanceResource - @Override public void injectResources(Ignite ignite) { + @Override protected void injectResources(Ignite ignite) { super.injectResources(ignite); // Inject resource. - if (ignite != null) + if (ignite != null) { + setLocalAddress(ignite.configuration().getLocalHost()); setAddressResolver(ignite.configuration().getAddressResolver()); + } + } + + /** + * Sets local host IP address that discovery SPI uses. + * <p> + * If not provided, by default a first found non-loopback address + * will be used. If there is no non-loopback address available, + * then {@link InetAddress#getLocalHost()} will be used. + * + * @param locAddr IP address. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setLocalAddress(String locAddr) { + // Injection should not override value already set by Spring or user. + if (this.locAddr == null) + this.locAddr = locAddr; + + return this; + } + + /** + * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. + * + * @return local address. + */ + public String getLocalAddress() { + return locAddr; } /** @@ -340,8 +486,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @see #setAckTimeout(long) */ @IgniteSpiConfiguration(optional = true) - public void setReconnectCount(int reconCnt) { + public TcpDiscoverySpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; + + return this; } /** {@inheritDoc} */ @@ -358,36 +506,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * is reached, then the process of message sending is considered as failed. * <p> * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}. + * <p> + * Affected server nodes only. * * @param maxAckTimeout Maximum acknowledgement timeout. */ @IgniteSpiConfiguration(optional = true) - public void setMaxAckTimeout(long maxAckTimeout) { + public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) { this.maxAckTimeout = maxAckTimeout; - } - - /** {@inheritDoc} */ - @Override public long getJoinTimeout() { - return joinTimeout; - } - /** - * Sets join timeout. - * <p> - * If non-shared IP finder is used and node fails to connect to - * any address from IP finder, node keeps trying to join within this - * timeout. If all addresses are still unresponsive, exception is thrown - * and node startup fails. - * <p> - * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. - * - * @param joinTimeout Join timeout ({@code 0} means wait forever). - * - * @see TcpDiscoveryIpFinder#isShared() - */ - @IgniteSpiConfiguration(optional = true) - public void setJoinTimeout(long joinTimeout) { - this.joinTimeout = joinTimeout; + return this; } /** {@inheritDoc} */ @@ -401,12 +529,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Sets local port to listen to. * <p> * If not specified, default is {@link #DFLT_PORT}. + * <p> + * Affected server nodes only. * * @param locPort Local port to bind. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPort(int locPort) { + public TcpDiscoverySpi setLocalPort(int locPort) { this.locPort = locPort; + + return this; } /** {@inheritDoc} */ @@ -420,12 +552,17 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>. * <p> * If not specified, default is {@link #DFLT_PORT_RANGE}. + * <p> + * Affected server nodes only. + * * @param locPortRange Local port range to bind. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPortRange(int locPortRange) { + public TcpDiscoverySpi setLocalPortRange(int locPortRange) { this.locPortRange = locPortRange; + + return this; } /** {@inheritDoc} */ @@ -437,12 +574,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Sets max heartbeats count node can miss without initiating status check. * <p> * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}. + * <p> + * Affected server nodes only. * * @param maxMissedHbs Max missed heartbeats. */ @IgniteSpiConfiguration(optional = true) - public void setMaxMissedHeartbeats(int maxMissedHbs) { + public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) { this.maxMissedHbs = maxMissedHbs; + + return this; } /** {@inheritDoc} */ @@ -458,8 +599,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param maxMissedClientHbs Max missed client heartbeats. */ @IgniteSpiConfiguration(optional = true) - public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) { + public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) { this.maxMissedClientHbs = maxMissedClientHbs; + + return this; } /** {@inheritDoc} */ @@ -479,8 +622,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param statsPrintFreq Statistics print frequency in milliseconds. */ @IgniteSpiConfiguration(optional = true) - public void setStatisticsPrintFrequency(long statsPrintFreq) { + public TcpDiscoverySpi setStatisticsPrintFrequency(long statsPrintFreq) { this.statsPrintFreq = statsPrintFreq; + + return this; } /** {@inheritDoc} */ @@ -492,211 +637,191 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * Sets IP finder clean frequency in milliseconds. * <p> * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ} + * <p> + * Affected server nodes only. * * @param ipFinderCleanFreq IP finder clean frequency. */ @IgniteSpiConfiguration(optional = true) - public void setIpFinderCleanFrequency(long ipFinderCleanFreq) { + public TcpDiscoverySpi setIpFinderCleanFrequency(long ipFinderCleanFreq) { this.ipFinderCleanFreq = ipFinderCleanFreq; + + return this; } /** - * This method is intended for troubleshooting purposes only. + * Gets IP finder for IP addresses sharing and storing. * - * @param debugMode {code True} to start SPI in debug mode. + * @return IP finder for IP addresses sharing and storing. */ - public void setDebugMode(boolean debugMode) { - this.debugMode = debugMode; + public TcpDiscoveryIpFinder getIpFinder() { + return ipFinder; } /** - * This method is intended for troubleshooting purposes only. + * Sets IP finder for IP addresses sharing and storing. + * <p> + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. * - * @param debugMsgHist Message history log size. + * @param ipFinder IP finder. */ - public void setDebugMessageHistory(int debugMsgHist) { - this.debugMsgHist = debugMsgHist; - } + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setIpFinder(TcpDiscoveryIpFinder ipFinder) { + this.ipFinder = ipFinder; - /** {@inheritDoc} */ - @Override public String getSpiState() { - synchronized (mux) { - return spiState.name(); - } + return this; } - /** {@inheritDoc} */ - @Override public long getSocketTimeout() { - return sockTimeout; - } + /** + * Sets socket operations timeout. This timeout is used to limit connection time and + * write-to-socket time. + * <p> + * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value + * significantly greater than the default (e.g. to {@code 30000}). + * <p> + * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}. + * + * @param sockTimeout Socket connection timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setSocketTimeout(long sockTimeout) { + this.sockTimeout = sockTimeout; - /** {@inheritDoc} */ - @Override public long getAckTimeout() { - return ackTimeout; + return this; } - /** {@inheritDoc} */ - @Override public long getNetworkTimeout() { - return netTimeout; - } + /** + * Sets timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. + * <p> + * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}. + * + * @param ackTimeout Acknowledgement timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; - /** {@inheritDoc} */ - @Override public int getThreadPriority() { - return threadPri; + return this; } - /** {@inheritDoc} */ - @Override public long getHeartbeatFrequency() { - return hbFreq; - } + /** + * Sets maximum network timeout to use for network operations. + * <p> + * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. + * + * @param netTimeout Network timeout. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setNetworkTimeout(long netTimeout) { + this.netTimeout = netTimeout; - /** {@inheritDoc} */ - @Override public String getIpFinderFormatted() { - return ipFinder.toString(); + return this; } /** {@inheritDoc} */ - @Override public int getMessageWorkerQueueSize() { - return msgWorker.queueSize(); + @Override public long getJoinTimeout() { + return joinTimeout; } - /** {@inheritDoc} */ - @Override public long getNodesJoined() { - return stats.joinedNodesCount(); - } + /** + * Sets join timeout. + * <p> + * If non-shared IP finder is used and node fails to connect to + * any address from IP finder, node keeps trying to join within this + * timeout. If all addresses are still unresponsive, exception is thrown + * and node startup fails. + * <p> + * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}. + * + * @param joinTimeout Join timeout ({@code 0} means wait forever). + * + * @see TcpDiscoveryIpFinder#isShared() + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setJoinTimeout(long joinTimeout) { + this.joinTimeout = joinTimeout; - /** {@inheritDoc} */ - @Override public long getNodesLeft() { - return stats.leftNodesCount(); + return this; } - /** {@inheritDoc} */ - @Override public long getNodesFailed() { - return stats.failedNodesCount(); - } + /** + * Sets thread priority. All threads within SPI will be started with it. + * <p> + * If not provided, default value is {@link #DFLT_THREAD_PRI} + * + * @param threadPri Thread priority. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setThreadPriority(int threadPri) { + this.threadPri = threadPri; - /** {@inheritDoc} */ - @Override public long getPendingMessagesRegistered() { - return stats.pendingMessagesRegistered(); + return this; } - /** {@inheritDoc} */ - @Override public long getPendingMessagesDiscarded() { - return stats.pendingMessagesDiscarded(); - } + /** + * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages + * in configurable time interval to other nodes to notify them about its state. + * <p> + * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. + * + * @param hbFreq Heartbeat frequency in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) { + this.hbFreq = hbFreq; - /** {@inheritDoc} */ - @Override public long getAvgMessageProcessingTime() { - return stats.avgMessageProcessingTime(); + return this; } - /** {@inheritDoc} */ - @Override public long getMaxMessageProcessingTime() { - return stats.maxMessageProcessingTime(); - } - - /** {@inheritDoc} */ - @Override public int getTotalReceivedMessages() { - return stats.totalReceivedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getReceivedMessages() { - return stats.receivedMessages(); - } - - /** {@inheritDoc} */ - @Override public int getTotalProcessedMessages() { - return stats.totalProcessedMessages(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Integer> getProcessedMessages() { - return stats.processedMessages(); - } - - /** {@inheritDoc} */ - @Override public long getCoordinatorSinceTimestamp() { - return stats.coordinatorSinceTimestamp(); - } - - /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { - TcpDiscoveryNode crd = resolveCoordinator(); - - return crd != null ? crd.id() : null; + /** + * @return Size of topology snapshots history. + */ + public long getTopHistorySize() { + return topHistSize; } - /** {@inheritDoc} */ - @Nullable @Override public ClusterNode getNode(UUID nodeId) { - assert nodeId != null; - - UUID locNodeId0 = getLocalNodeId(); - - if (locNodeId0 != null && locNodeId0.equals(nodeId)) - // Return local node directly. - return locNode; + /** + * Sets size of topology snapshots history. Specified size should be greater than or equal to default size + * {@link #DFLT_TOP_HISTORY_SIZE}. + * + * @param topHistSize Size of topology snapshots history. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setTopHistorySize(int topHistSize) { + if (topHistSize < DFLT_TOP_HISTORY_SIZE) { + U.warn(log, "Topology history size should be greater than or equal to default size. " + + "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + + ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - TcpDiscoveryNode node = ring.node(nodeId); + return this; + } - if (node != null && !node.visible()) - return null; + this.topHistSize = topHistSize; - return node; + return this; } /** {@inheritDoc} */ - @Override public Collection<ClusterNode> getRemoteNodes() { - return F.upcast(ring.visibleRemoteNodes()); - } + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + assert locNodeAttrs == null; + assert locNodeVer == null; - /** {@inheritDoc} */ - @Override public Collection<Object> injectables() { - return F.<Object>asList(ipFinder); - } + if (log.isDebugEnabled()) { + log.debug("Node attributes to set: " + attrs); + log.debug("Node version to set: " + ver); + } - /** {@inheritDoc} */ - @Override public void spiStart(String gridName) throws IgniteSpiException { - spiStart0(false); + locNodeAttrs = attrs; + locNodeVer = ver; } /** - * Starts or restarts SPI after stop (to reconnect). - * - * @param restart {@code True} if SPI is restarted after stop. - * @throws IgniteSpiException If failed. + * @param srvPort Server port. */ - private void spiStart0(boolean restart) throws IgniteSpiException { - if (!restart) - // It is initial start. - onSpiStart(); - - synchronized (mux) { - spiState = DISCONNECTED; - } - - if (debugMode) { - if (!log.isInfoEnabled()) - throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + - "in debug mode."); - - debugLog = new ConcurrentLinkedDeque<>(); - - U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); - } - - // Clear addresses collections. - fromAddrs.clear(); - noResAddrs.clear(); - - sockTimeoutWorker = new SocketTimeoutWorker(); - sockTimeoutWorker.start(); - - msgWorker = new RingMessageWorker(); - msgWorker.start(); - - tcpSrvr = new TcpServer(); - + protected void initLocalNode(int srvPort, boolean addExtAddrAttr) { // Init local node. IgniteBiTuple<Collection<String>, Collection<String>> addrs; @@ -711,4565 +836,975 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov getLocalNodeId(), addrs.get1(), addrs.get2(), - tcpSrvr.port, + srvPort, metricsProvider, locNodeVer); - Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : - U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), - locNode.discoveryPort()); - - if (extAddrs != null) - locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); - - locNode.setAttributes(locNodeAttrs); - - locNode.local(true); - - locNodeAddrs = getNodeAddresses(locNode); - - if (log.isDebugEnabled()) - log.debug("Local node initialized: " + locNode); - - // Start TCP server thread after local node is initialized. - tcpSrvr.start(); - - ring.localNode(locNode); - - if (ipFinder.isShared()) - registerLocalNodeAddress(); - else { - if (F.isEmpty(ipFinder.getRegisteredAddresses())) - throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " + - "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " + - "(specify list of IP addresses in configuration)."); - - ipFinderHasLocAddr = ipFinderHasLocalAddress(); - } - - if (statsPrintFreq > 0 && log.isInfoEnabled()) { - statsPrinter = new StatisticsPrinter(); - statsPrinter.start(); - } - - stats.onJoinStarted(); - - joinTopology(); - - stats.onJoinFinished(); - - hbsSnd = new HeartbeatsSender(); - hbsSnd.start(); - - chkStatusSnd = new CheckStatusSender(); - chkStatusSnd.start(); - - if (ipFinder.isShared()) { - ipFinderCleaner = new IpFinderCleaner(); - ipFinderCleaner.start(); - } - - if (log.isDebugEnabled() && !restart) - log.debug(startInfo()); - - if (restart) - getSpiContext().registerPort(tcpSrvr.port, TCP); - } - - /** - * @throws IgniteSpiException If failed. - */ - @SuppressWarnings("BusyWait") - private void registerLocalNodeAddress() throws IgniteSpiException { - // Make sure address registration succeeded. - while (true) { - try { - ipFinder.initializeLocalAddresses(locNode.socketAddresses()); - - // Success. - break; - } - catch (IllegalStateException e) { - throw new IgniteSpiException("Failed to register local node address with IP finder: " + - locNode.socketAddresses(), e); - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to register local node address in IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - } - - /** - * @throws IgniteSpiException If failed. - */ - private void onSpiStart() throws IgniteSpiException { - startStopwatch(); - - assertParameter(ipFinder != null, "ipFinder != null"); - assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); - assertParameter(locPort > 1023, "localPort > 1023"); - assertParameter(locPortRange >= 0, "localPortRange >= 0"); - assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(netTimeout > 0, "networkTimeout > 0"); - assertParameter(sockTimeout > 0, "sockTimeout > 0"); - assertParameter(ackTimeout > 0, "ackTimeout > 0"); - assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout"); - assertParameter(reconCnt > 0, "reconnectCnt > 0"); - assertParameter(hbFreq > 0, "heartbeatFreq > 0"); - assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); - assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); - assertParameter(threadPri > 0, "threadPri > 0"); - assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0"); - - try { - locHost = U.resolveLocalHost(locAddr); - } - catch (IOException e) { - throw new IgniteSpiException("Unknown local address: " + locAddr, e); - } - - if (log.isDebugEnabled()) { - log.debug(configInfo("localHost", locHost.getHostAddress())); - log.debug(configInfo("localPort", locPort)); - log.debug(configInfo("localPortRange", locPortRange)); - log.debug(configInfo("threadPri", threadPri)); - log.debug(configInfo("networkTimeout", netTimeout)); - log.debug(configInfo("sockTimeout", sockTimeout)); - log.debug(configInfo("ackTimeout", ackTimeout)); - log.debug(configInfo("maxAckTimeout", maxAckTimeout)); - log.debug(configInfo("reconnectCount", reconCnt)); - log.debug(configInfo("ipFinder", ipFinder)); - log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); - log.debug(configInfo("heartbeatFreq", hbFreq)); - log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs)); - log.debug(configInfo("statsPrintFreq", statsPrintFreq)); - } - - // Warn on odd network timeout. - if (netTimeout < 3000) - U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); - - registerMBean(gridName, this, TcpDiscoverySpiMBean.class); - - if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { - TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); - - if (mcastIpFinder.getLocalAddress() == null) - mcastIpFinder.setLocalAddress(locAddr); - } - } - - /** {@inheritDoc} */ - @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ctxInitLatch.countDown(); - - spiCtx.registerPort(tcpSrvr.port, TCP); - } - - /** {@inheritDoc} */ - @Override public IgniteSpiContext getSpiContext() { - if (ctxInitLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Waiting for context initialization."); - - try { - U.await(ctxInitLatch); - - if (log.isDebugEnabled()) - log.debug("Context has been initialized."); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e); - } - } - - return super.getSpiContext(); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - spiStop0(false); - } - - /** - * Stops SPI finally or stops SPI for restart. - * - * @param disconnect {@code True} if SPI is being disconnected. - * @throws IgniteSpiException If failed. - */ - private void spiStop0(boolean disconnect) throws IgniteSpiException { - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); - - if (log.isDebugEnabled()) { - if (disconnect) - log.debug("Disconnecting SPI."); - else - log.debug("Preparing to start local node stop procedure."); - } - - if (disconnect) { - synchronized (mux) { - spiState = DISCONNECTING; - } - } - - if (msgWorker != null && msgWorker.isAlive() && !disconnect) { - // Send node left message only if it is final stop. - msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(getLocalNodeId())); - - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; - - long timeout = netTimeout; - - while (spiState != LEFT && timeout > 0) { - try { - mux.wait(timeout); + if (addExtAddrAttr) { + Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : + U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), + locNode.discoveryPort()); - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); + locNodeAddrs = new LinkedHashSet<>(); + locNodeAddrs.addAll(locNode.socketAddresses()); - break; - } - } + if (extAddrs != null) { + locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); - if (spiState == LEFT) { - if (log.isDebugEnabled()) - log.debug("Verification for local node leave has been received from coordinator" + - " (continuing stop procedure)."); - } - else if (log.isInfoEnabled()) { - log.info("No verification for local node leave has been received from coordinator" + - " (will stop node anyway)."); - } + locNodeAddrs.addAll(extAddrs); } } - U.interrupt(tcpSrvr); - U.join(tcpSrvr, log); - - Collection<SocketReader> tmp; - - synchronized (mux) { - tmp = U.arrayList(readers); - } - - U.interrupt(tmp); - U.joinThreads(tmp, log); - - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); - - U.interrupt(msgWorker); - U.join(msgWorker, log); - - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); - - U.interrupt(statsPrinter); - U.join(statsPrinter, log); - - if (ipFinder != null) - ipFinder.close(); - - Collection<TcpDiscoveryNode> rmts = null; - - if (!disconnect) { - // This is final stop. - unregisterMBean(); - - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - else { - getSpiContext().deregisterPorts(); - - rmts = ring.visibleRemoteNodes(); - } - - long topVer = ring.topologyVersion(); - - ring.clear(); - - if (rmts != null && !rmts.isEmpty()) { - // This is restart/disconnection and remote nodes are not empty. - // We need to fire FAIL event for each. - DiscoverySpiListener lsnr = this.lsnr; - - if (lsnr != null) { - Set<ClusterNode> processed = new HashSet<>(); - - for (TcpDiscoveryNode n : rmts) { - assert n.visible(); - - processed.add(n); - - List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed)); - - topVer++; - - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, - Collections.unmodifiableList(top)); - - lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null); - } - } - } - - printStatistics(); - - stats.clear(); - - synchronized (mux) { - // Clear stored data. - leavingNodes.clear(); - failedNodes.clear(); - - spiState = DISCONNECTED; - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - if (ctxInitLatch.getCount() > 0) - // Safety. - ctxInitLatch.countDown(); - - getSpiContext().deregisterPorts(); - } - - /** - * @throws IgniteSpiException If any error occurs. - * @return {@code true} if IP finder contains local address. - */ - private boolean ipFinderHasLocalAddress() throws IgniteSpiException { - for (InetSocketAddress locAddr : locNodeAddrs) { - for (InetSocketAddress addr : registeredAddresses()) - try { - int port = addr.getPort(); - - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) : - new InetSocketAddress(addr.getAddress(), port); - - if (resolved.equals(locAddr)) - return true; - } - catch (UnknownHostException e) { - onException(e.getMessage(), e); - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - assert nodeId != null; - - if (log.isDebugEnabled()) - log.debug("Ping node. NodeId: [" + nodeId + "]."); - - if (nodeId == getLocalNodeId()) - return true; - - TcpDiscoveryNode node = ring.node(nodeId); - - if (node == null || !node.visible()) - return false; - - boolean res = pingNode(node); - - if (!res && !node.isClient()) { - LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); - - msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); - } - - return res; - } - - /** - * Pings the remote node to see if it's alive. - * - * @param node Node. - * @return {@code True} if ping succeeds. - */ - private boolean pingNode(TcpDiscoveryNode node) { - assert node != null; - - if (node.id().equals(getLocalNodeId())) - return true; - - UUID clientNodeId = null; - - if (node.isClient()) { - clientNodeId = node.id(); - - node = ring.node(node.clientRouterNodeId()); - - if (node == null || !node.visible()) - return false; - } - - for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) { - try { - // ID returned by the node should be the same as ID of the parameter for ping to succeed. - IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); - - return node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); - - onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); - // continue; - } - } - - return false; - } - - /** - * Pings the remote node by its address to see if it's alive. - * - * @param addr Address of the node. - * @return ID of the remote node if node alive. - * @throws IgniteSpiException If an error occurs. - */ - private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) - throws IgniteCheckedException { - assert addr != null; - - UUID locNodeId = getLocalNodeId(); - - if (F.contains(locNodeAddrs, addr)) - return F.t(getLocalNodeId(), false); - - GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); - - IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); - - if (oldFut != null) - return oldFut.get(); - else { - Collection<Throwable> errs = null; - - try { - Socket sock = null; - - for (int i = 0; i < reconCnt; i++) { - try { - if (addr.isUnresolved()) - addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()); - - long tstamp = U.currentTimeMillis(); - - sock = openSocket(addr); - - writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId)); - - TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout); - - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Ping response from local node: " + res); - - break; - } - - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); - - IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists()); - - fut.onDone(t); - - return t; - } - catch (IOException | IgniteCheckedException e) { - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - finally { - U.closeQuiet(sock); - } - } - } - catch (Throwable t) { - fut.onDone(t); - - if (t instanceof Error) - throw t; - - throw U.cast(t); - } - finally { - if (!fut.isDone()) - fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs)); - - boolean b = pingMap.remove(addr, fut); - - assert b; - } - - return fut.get(); - } - } - - /** {@inheritDoc} */ - @Override public void disconnect() throws IgniteSpiException { - spiStop0(true); - } - - /** {@inheritDoc} */ - @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) { - this.nodeAuth = nodeAuth; - } - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { - try { - byte[] msgBytes; - - msgBytes = marsh.marshal(evt); - - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, msgBytes)); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); - } - } - - /** {@inheritDoc} */ - @Override public void failNode(UUID nodeId) { - ClusterNode node = ring.node(nodeId); - - if (node != null) { - TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), - node.id(), node.order()); - - msgWorker.addMessage(msg); - } - } - - /** - * Tries to join this node to topology. - * - * @throws IgniteSpiException If any error occurs. - */ - private void joinTopology() throws IgniteSpiException { - synchronized (mux) { - assert spiState == CONNECTING || spiState == DISCONNECTED; - - spiState = CONNECTING; - } - - SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes() - .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - // Marshal credentials for backward compatibility and security. - marshalCredentials(locNode); - - while (true) { - if (!sendJoinRequestMessage()) { - if (log.isDebugEnabled()) - log.debug("Join request message has not been sent (local node is the first in the topology)."); - - if (nodeAuth != null) { - // Authenticate local node. - try { - SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); - - if (subj == null) - throw new IgniteSpiException("Authentication failed for local node: " + locNode.id()); - - Map<String, Object> attrs = new HashMap<>(locNode.attributes()); - - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, - ignite.configuration().getMarshaller().marshal(subj)); - attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - locNode.setAttributes(attrs); - } - catch (IgniteException | IgniteCheckedException e) { - throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); - } - } - - locNode.order(1); - locNode.internalOrder(1); - - gridStartTime = U.currentTimeMillis(); - - locNode.visible(true); - - ring.clear(); - - ring.topologyVersion(1); - - synchronized (mux) { - topHist.clear(); - - spiState = CONNECTED; - - mux.notifyAll(); - } - - notifyDiscovery(EVT_NODE_JOINED, 1, locNode); - - break; - } - - if (log.isDebugEnabled()) - log.debug("Join request message has been sent (waiting for coordinator response)."); - - synchronized (mux) { - long threshold = U.currentTimeMillis() + netTimeout; - - long timeout = netTimeout; - - while (spiState == CONNECTING && timeout > 0) { - try { - mux.wait(timeout); - - timeout = threshold - U.currentTimeMillis(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - - throw new IgniteSpiException("Thread has been interrupted."); - } - } - - if (spiState == CONNECTED) - break; - else if (spiState == DUPLICATE_ID) - throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get()); - else if (spiState == AUTH_FAILED) - throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get()); - else if (spiState == CHECK_FAILED) - throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get()); - else if (spiState == LOOPBACK_PROBLEM) { - TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get(); - - boolean locHostLoopback = locHost.isLoopbackAddress(); - - String firstNode = locHostLoopback ? "local" : "remote"; - - String secondNode = locHostLoopback ? "remote" : "local"; - - throw new IgniteSpiException("Failed to add node to topology because " + firstNode + - " node is configured to use loopback address, but " + secondNode + " node is not " + - "(consider changing 'localAddress' configuration parameter) " + - "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + - U.addressesAsString(msg.addresses(), msg.hostNames()) + ']'); - } - else - LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " + - "Check remote nodes logs for possible error messages. " + - "Note that large topology may require significant time to start. " + - "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " + - "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']'); - } - } - - assert locNode.order() != 0; - assert locNode.internalOrder() != 0; - - if (log.isDebugEnabled()) - log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder()); - } - - /** - * Tries to send join request message to a random node presenting in topology. - * Address is provided by {@link TcpDiscoveryIpFinder} and message is - * sent to first node connection succeeded to. - * - * @return {@code true} if send succeeded. - * @throws IgniteSpiException If any error occurs. - */ - @SuppressWarnings({"BusyWait"}) - private boolean sendJoinRequestMessage() throws IgniteSpiException { - TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - collectExchangeData(getLocalNodeId())); - - // Time when it has been detected, that addresses from IP finder do not respond. - long noResStart = 0; - - while (true) { - Collection<InetSocketAddress> addrs = resolvedAddresses(); - - if (F.isEmpty(addrs)) - return false; - - boolean retry = false; - Collection<Exception> errs = new ArrayList<>(); - - try (SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2)) { - GridTuple3<InetSocketAddress, Socket, Exception> tuple; - - while ((tuple = multiConnector.next()) != null) { - InetSocketAddress addr = tuple.get1(); - Socket sock = tuple.get2(); - Exception ex = tuple.get3(); - - if (ex == null) { - assert sock != null; - - try { - Integer res = sendMessageDirectly(joinReq, addr, sock); - - assert res != null; - - noResAddrs.remove(addr); - - // Address is responsive, reset period start. - noResStart = 0; - - switch (res) { - case RES_WAIT: - // Concurrent startup, try sending join request again or wait if no success. - retry = true; - - break; - case RES_OK: - if (log.isDebugEnabled()) - log.debug("Join request message has been sent to address [addr=" + addr + - ", req=" + joinReq + ']'); - - // Join request sending succeeded, wait for response from topology. - return true; - - default: - // Concurrent startup, try next node. - if (res == RES_CONTINUE_JOIN) { - if (!fromAddrs.contains(addr)) - retry = true; - } - else { - if (log.isDebugEnabled()) - log.debug("Unexpected response to join request: " + res); - - retry = true; - } - - break; - } - } - catch (IgniteSpiException e) { - e.printStackTrace(); - - ex = e; - } - } - - if (ex != null) { - errs.add(ex); - - if (log.isDebugEnabled()) { - IOException ioe = X.cause(ex, IOException.class); - - log.debug("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']'); - - onException("Failed to send join request message [addr=" + addr + - ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe); - } - - noResAddrs.add(addr); - } - } - } - - if (retry) { - if (log.isDebugEnabled()) - log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - else if (!ipFinder.isShared() && !ipFinderHasLocAddr) { - IgniteCheckedException e = null; - - if (!errs.isEmpty()) { - e = new IgniteCheckedException("Multiple connection attempts failed."); - - for (Exception err : errs) - e.addSuppressed(err); - } - - if (e != null && X.hasCause(e, ConnectException.class)) - LT.warn(log, null, "Failed to connect to any address from IP finder " + - "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " + - addrs); - - if (joinTimeout > 0) { - if (noResStart == 0) - noResStart = U.currentTimeMillis(); - else if (U.currentTimeMillis() - noResStart > joinTimeout) - throw new IgniteSpiException( - "Failed to connect to any address from IP finder within join timeout " + - "(make sure IP finder addresses are correct, and operating system firewalls are disabled " + - "on all host machines, or consider increasing 'joinTimeout' configuration property): " + - addrs, e); - } - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException ex) { - throw new IgniteSpiException("Thread has been interrupted.", ex); - } - } - else - break; - } - - return false; - } - - /** - * Establishes connection to an address, sends message and returns the response (if any). - * - * @param msg Message to send. - * @param addr Address to send message to. - * @return Response read from the recipient or {@code null} if no response is supposed. - * @throws IgniteSpiException If an error occurs. - */ - @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock) - throws IgniteSpiException { - assert msg != null; - assert addr != null; - - Collection<Throwable> errs = null; - - long ackTimeout0 = ackTimeout; - - int connectAttempts = 1; - - boolean joinReqSent = false; - - UUID locNodeId = getLocalNodeId(); - - for (int i = 0; i < reconCnt; i++) { - // Need to set to false on each new iteration, - // since remote node may leave in the middle of the first iteration. - joinReqSent = false; - - boolean openSock = false; - - try { - long tstamp = U.currentTimeMillis(); - - if (sock == null) - sock = openSocket(addr); - - openSock = true; - - // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId)); - - TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0); - - if (locNodeId.equals(res.creatorNodeId())) { - if (log.isDebugEnabled()) - log.debug("Handshake response from local node: " + res); - - break; - } - - stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); - - // Send message. - tstamp = U.currentTimeMillis(); - - writeToSocket(sock, msg); - - stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); - - if (debugMode) - debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - if (log.isDebugEnabled()) - log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr + - ", rmtNodeId=" + res.creatorNodeId() + ']'); - - // Connection has been established, but - // join request may not be unmarshalled on remote host. - // E.g. due to class not found issue. - joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - - return readReceipt(sock, ackTimeout0); - } - catch (ClassCastException e) { - // This issue is rarely reproducible on AmazonEC2, but never - // on dedicated machines. - if (log.isDebugEnabled()) - U.error(log, "Class cast exception on direct send: " + addr, e); - - onException("Class cast exception on direct send: " + addr, e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.error("Exception on direct send: " + e.getMessage(), e); - - onException("Exception on direct send: " + e.getMessage(), e); - - if (errs == null) - errs = new ArrayList<>(); - - errs.add(e); - - if (!openSock) { - // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2) { - connectAttempts++; - - continue; - } - - break; // Don't retry if we can not establish connection. - } - - if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { - ackTimeout0 *= 2; - - if (!checkAckTimeout(ackTimeout0)) - break; - } - } - finally { - U.closeQuiet(sock); - - sock = null; - } - } - - if (joinReqSent) { - if (log.isDebugEnabled()) - log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT)."); - - // Topology will not include this node, - // however, warning on timed out join will be output. - return RES_OK; - } - - throw new IgniteSpiException( - "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']', - U.exceptionWithSuppressed("Failed to send message to address " + - "[addr=" + addr + ", msg=" + msg + ']', errs)); - } - - /** - * Marshalls credentials with discovery SPI marshaller (will replace attribute value). - * - * @param node Node to marshall credentials for. - * @throws IgniteSpiException If marshalling failed. - */ - private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - // Use security-unsafe getter. - Map<String, Object> attrs = new HashMap<>(node.getAttributes()); - - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, - marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))); - - node.setAttributes(attrs); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); - } - } - - /** - * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value). - * - * @param node Node to unmarshall credentials for. - * @return Security credentials. - * @throws IgniteSpiException If unmarshal fails. - */ - private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { - try { - byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); - - if (credBytes == null) - return null; - - return marsh.unmarshal(credBytes, null); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e); - } - } - - /** - * @param ackTimeout Acknowledgement timeout. - * @return {@code True} if acknowledgement timeout is less or equal to - * maximum acknowledgement timeout, {@code false} otherwise. - */ - private boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > maxAckTimeout) { - LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + - "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']'); - - return false; - } - - return true; - } - - /** - * Notify external listener on discovery event. - * - * @param type Discovery event type. See {@link DiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. - */ - private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { - assert type > 0; - assert node != null; - - DiscoverySpiListener lsnr = this.lsnr; - - TcpDiscoverySpiState spiState = spiStateCopy(); - - if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - - Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); - - Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); - - lsnr.onDiscovery(type, topVer, node, top, hist, null); - } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - } - - /** - * Update topology history with new topology snapshots. - * - * @param topVer Topology version. - * @param top Topology snapshot. - * @return Copy of updated topology history. - */ - @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) { - synchronized (mux) { - if (topHist.containsKey(topVer)) - return null; - - topHist.put(topVer, top); - - while (topHist.size() > topHistSize) - topHist.remove(topHist.firstKey()); - - if (log.isDebugEnabled()) - log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size()); - - return new TreeMap<>(topHist); - } - } - - /** - * @param msg Error message. - * @param e Exception. - */ - private void onException(String msg, Exception e){ - getExceptionRegistry().onException(msg, e); - } - - /** - * @param node Node. - * @return {@link LinkedHashSet} of internal and external addresses of provided node. - * Internal addresses placed before external addresses. - */ - @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { - LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); - - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - - if (extAddrs != null) - res.addAll(extAddrs); - - return res; - } - - /** - * @param node Node. - * @param sameHost Same host flag. - * @return {@link LinkedHashSet} of internal and external addresses of provided node. - * Internal addresses placed before external addresses. - * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}. - */ - @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) { - List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses()); - - Collections.sort(addrs, U.inetAddressesComparator(sameHost)); - - LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs); - - Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); - - if (extAddrs != null) - res.addAll(extAddrs); - - return res; - } - - /** - * Checks whether local node is coordinator. Nodes that are leaving or failed - * (but are still in topology) are removed from search. - * - * @return {@code true} if local node is coordinator. - */ - private boolean isLocalNodeCoordinator() { - synchronized (mux) { - boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator()); - - if (crd) - stats.onBecomingCoordinator(); - - return crd; - } - } - - /** - * @return Spi state copy. - */ - private TcpDiscoverySpiState spiStateCopy() { - TcpDiscoverySpiState state; - - synchronized (mux) { - state = spiState; - } - - return state; - } - - /** - * Resolves coordinator. Nodes that are leaving or failed (but are still in - * topology) are removed from search. - * - * @return Coordinator node or {@code null} if there are no coordinator - * (i.e. local node is the last one and is currently stopping). - */ - @Nullable private TcpDiscoveryNode resolveCoordinator() { - return resolveCoordinator(null); - } - - /** - * Resolves coordinator. Nodes that are leaving or failed (but are still in - * topology) are removed from search as well as provided filter. - * - * @param filter Nodes to exclude when resolving coordinator (optional). - * @return Coordinator node or {@code null} if there are no coordinator - * (i.e. local node is the last one and is currently stopping). - */ - @Nullable private TcpDiscoveryNode resolveCoordinator( - @Nullable Collection<TcpDiscoveryNode> filter) { - synchronized (mux) { - Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); - - if (!F.isEmpty(filter)) - excluded = F.concat(false, excluded, filter); - - return ring.coordinator(excluded); - } - } - - /** - * Prints SPI statistics. - */ - private void printStatistics() { - if (log.isInfoEnabled() && statsPrintFreq > 0) { - int failedNodesSize; - int leavingNodesSize; - - synchronized (mux) { - failedNodesSize = failedNodes.size(); - leavingNodesSize = leavingNodes.size(); - } - - Runtime runtime = Runtime.getRuntime(); - - TcpDiscoveryNode coord = resolveCoordinator(); - - log.info("Discovery SPI statistics [statistics=" + stats + ", spiState=" + spiStateCopy() + - ", coord=" + coord + - ", topSize=" + ring.allNodes().size() + - ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + - ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + - ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + - ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + - "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); - } - } - - /** - * @param msg Message to prepare. - * @param destNodeId Destination node ID. - * @param msgs Messages to include. - * @param discardMsgId Discarded message ID. - */ - private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { - assert destNodeId != null; - - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - - TcpDiscoveryNode node = nodeAddedMsg.node(); - - if (node.id().equals(destNodeId)) { - Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); - Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); - - for (TcpDiscoveryNode n0 : allNodes) { - assert n0.internalOrder() != 0 : n0; - - // Skip next node and nodes added after next - // in case this message is resent due to failures/leaves. - // There will be separate messages for nodes with greater - // internal order. - if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) - topToSend.add(n0); - } - - nodeAddedMsg.topology(topToSend); - nodeAddedMsg.messages(msgs, discardMsgId); - - Map<Long, Collection<ClusterNode>> hist; - - synchronized (mux) { - hist = new TreeMap<>(topHist); - } - - nodeAddedMsg.topologyHistory(hist); - } - } - } - - /** - * @param msg Message to clear. - */ - private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - // Nullify topology before registration. - TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - - nodeAddedMsg.topology(null); - nodeAddedMsg.topologyHistory(null); - nodeAddedMsg.messages(null, null); - } - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * Simulates this node failure by stopping service threads. So, node will become - * unresponsive. - * <p> - * This method is intended for test purposes only. - */ - void simulateNodeFailure() { - U.warn(log, "Simulating node failure: " + getLocalNodeId()); - - U.interrupt(tcpSrvr); - U.join(tcpSrvr, log); - - U.interrupt(hbsSnd); - U.join(hbsSnd, log); - - U.interrupt(chkStatusSnd); - U.join(chkStatusSnd, log); - - U.interrupt(ipFinderCleaner); - U.join(ipFinderCleaner, log); - - Collection<SocketReader> tmp; - - synchronized (mux) { - tmp = U.arrayList(readers); - } - - U.interrupt(tmp); - U.joinThreads(tmp, log); - - U.interrupt(msgWorker); - U.join(msgWorker, log); - - U.interrupt(statsPrinter); - U.join(statsPrinter, log); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * Simulates situation when next node is still alive but is bypassed - * since it has been excluded from the ring, possibly, due to short time - * network problems. - * <p> - * This method is intended for test purposes only. - */ - void forceNextNodeFailure() { - U.warn(log, "Next node will be forcibly failed (if any)."); - - TcpDiscoveryNode next; - - synchronized (mux) { - next = ring.nextNode(failedNodes); - } - - if (next != null) - msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(), - next.internalOrder())); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - */ - public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) { - sendMsgLsnrs.add(msg); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - */ - public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) { - sendMsgLsnrs.remove(msg); - } - - /** - * <strong>FOR TEST ONLY!!!</strong> - * <p> - * This method is intended for test purposes only. - * - * @return Nodes ring. - */ - TcpDiscoveryNodesRing ring() { - return ring; - } - - /** {@inheritDoc} */ - @Override public void dumpDebugInfo() { - dumpDebugInfo(log); - } - - /** - * @param log Logger. - */ - public void dumpDebugInfo(IgniteLogger log) { - if (!debugMode) { - U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " + - "in debug mode, consider setting 'debugMode' configuration property to 'true')."); - - return; - } - - assert log.isInfoEnabled(); - - synchronized (mux) { - StringBuilder b = new StringBuilder(U.nl()); - - b.append(">>>").append(U.nl()); - b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); - b.append(">>>").append(U.nl()); - - b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl()); - b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); - b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl()); - - b.append("Internal threads: ").append(U.nl()); - - b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); - b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); - b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(sockTimeoutWorker)).append(U.nl()); - b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); - b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); - - b.append(U.nl()); - - b.append("Socket readers: ").append(U.nl()); - - for (SocketReader rdr : readers) - b.append(" ").append(rdr).append(U.nl()); - - b.append(U.nl()); - - b.append("In-memory log messages: ").append(U.nl()); - - for (String msg : debugLog) - b.append(" ").append(msg).append(U.nl()); - - b.append(U.nl()); - - b.append("Leaving nodes: ").append(U.nl()); - - for (TcpDiscoveryNode node : leavingNodes) - b.append(" ").append(node.id()).append(U.nl()); - - b.append(U.nl()); - - b.append("Failed nodes: ").append(U.nl()); - - for (TcpDiscoveryNode node : failedNodes) - b.append(" ").append(node.id()).append(U.nl()); - - b.append(U.nl()); - - b.append("Stats: ").append(stats).append(U.nl()); - - U.quietAndInfo(log, b.toString()); - } - } - - /** - * @param msg Message. - */ - private void debugLog(String msg) { - assert debugMode; - - String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + - '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + - "-" + locNode.internalOrder() + "] " + - msg; - - debugLog.add(msg0); - - int delta = debugLog.size() - debugMsgHist; - - for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) - debugLog.poll(); - } - - /** - * @param msg Message. - * @return {@code True} if recordable in debug mode. - */ - private boolean recordable(TcpDiscoveryAbstractMessage msg) { - return !(msg instanceof TcpDiscoveryHeartbeatMessage) && - !(msg instanceof TcpDiscoveryStatusCheckMessage) && - !(msg instanceof TcpDiscoveryDiscardMessage); - } - - /** - * @param t Thread. - * @return Status as string. - */ - private String threadStatus(Thread t) { - if (t == null) - return "N/A"; - - return t.isAlive() ? "alive" : "dead"; - } - - /** - * Checks if two given {@link SecurityPermi
<TRUNCATED>