http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java new file mode 100644 index 0000000..5377e18 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -0,0 +1,996 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.product.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.io.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; + +/** + * Base class for TCP discovery SPIs. + */ +abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { + /** Default port to listen (value is <tt>47500</tt>). */ + public static final int DFLT_PORT = 47500; + + /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT = 2000; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 5000; + + /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 5000; + + /** Default value for thread priority (value is <tt>10</tt>). */ + public static final int DFLT_THREAD_PRI = 10; + + /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */ + public static final long DFLT_HEARTBEAT_FREQ = 2000; + + /** Default size of topology snapshots history. */ + public static final int DFLT_TOP_HISTORY_SIZE = 1000; + + /** Response OK. */ + protected static final int RES_OK = 1; + + /** Response CONTINUE JOIN. */ + protected static final int RES_CONTINUE_JOIN = 100; + + /** Response WAIT. */ + protected static final int RES_WAIT = 200; + + /** Local address. */ + protected String locAddr; + + /** IP finder. */ + protected TcpDiscoveryIpFinder ipFinder; + + /** Socket operations timeout. */ + protected long sockTimeout = DFLT_SOCK_TIMEOUT; + + /** Message acknowledgement timeout. */ + protected long ackTimeout = DFLT_ACK_TIMEOUT; + + /** Network timeout. */ + protected long netTimeout = DFLT_NETWORK_TIMEOUT; + + /** Thread priority for all threads started by SPI. */ + protected int threadPri = DFLT_THREAD_PRI; + + /** Heartbeat messages issuing frequency. */ + protected long hbFreq = DFLT_HEARTBEAT_FREQ; + + /** Size of topology snapshots history. */ + protected int topHistSize = DFLT_TOP_HISTORY_SIZE; + + /** Grid discovery listener. */ + protected volatile DiscoverySpiListener lsnr; + + /** Data exchange. */ + protected DiscoverySpiDataExchange exchange; + + /** Metrics provider. */ + protected DiscoveryMetricsProvider metricsProvider; + + /** Local node attributes. */ + protected Map<String, Object> locNodeAttrs; + + /** Local node version. */ + protected IgniteProductVersion locNodeVer; + + /** Local node. */ + protected TcpDiscoveryNode locNode; + + /** Local host. */ + protected InetAddress locHost; + + /** Internal and external addresses of local node. */ + protected Collection<InetSocketAddress> locNodeAddrs; + + /** Socket timeout worker. */ + protected SocketTimeoutWorker sockTimeoutWorker; + + /** Discovery state. */ + protected TcpDiscoverySpiState spiState = DISCONNECTED; + + /** Start time of the very first grid node. */ + protected volatile long gridStartTime; + + /** Marshaller. */ + protected final IgniteMarshaller marsh = new IgniteJdkMarshaller(); + + /** Statistics. */ + protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); + + /** Local node ID. */ + @IgniteLocalNodeIdResource + protected UUID locNodeId; + + /** Name of the grid. */ + @IgniteNameResource + protected String gridName; + + /** Logger. */ + @IgniteLoggerResource + protected IgniteLogger log; + + /** + * Sets local host IP address that discovery SPI uses. + * <p> + * If not provided, by default a first found non-loopback address + * will be used. If there is no non-loopback address available, + * then {@link InetAddress#getLocalHost()} will be used. + * + * @param locAddr IP address. + */ + @IgniteSpiConfiguration(optional = true) + @IgniteLocalHostResource + public void setLocalAddress(String locAddr) { + // Injection should not override value already set by Spring or user. + if (this.locAddr == null) + this.locAddr = locAddr; + } + + /** + * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. + * + * @return local address. + */ + public String getLocalAddress() { + return locAddr; + } + + /** + * Gets IP finder for IP addresses sharing and storing. + * + * @return IP finder for IP addresses sharing and storing. + */ + public TcpDiscoveryIpFinder getIpFinder() { + return ipFinder; + } + + /** + * Sets IP finder for IP addresses sharing and storing. + * <p> + * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. + * + * @param ipFinder IP finder. + */ + @IgniteSpiConfiguration(optional = true) + public void setIpFinder(TcpDiscoveryIpFinder ipFinder) { + this.ipFinder = ipFinder; + } + + /** + * Sets socket operations timeout. This timeout is used to limit connection time and + * write-to-socket time. + * <p> + * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value + * significantly greater than the default (e.g. to {@code 30000}). + * <p> + * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. + * + * @param sockTimeout Socket connection timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setSocketTimeout(long sockTimeout) { + this.sockTimeout = sockTimeout; + } + + /** + * Sets timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. + * <p> + * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. + * + * @param ackTimeout Acknowledgement timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + } + + /** + * Sets maximum network timeout to use for network operations. + * <p> + * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. + * + * @param netTimeout Network timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setNetworkTimeout(long netTimeout) { + this.netTimeout = netTimeout; + } + + /** + * Sets thread priority. All threads within SPI will be started with it. + * <p> + * If not provided, default value is {@link #DFLT_THREAD_PRI} + * + * @param threadPri Thread priority. + */ + @IgniteSpiConfiguration(optional = true) + public void setThreadPriority(int threadPri) { + this.threadPri = threadPri; + } + + /** + * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages + * in configurable time interval to other nodes to notify them about its state. + * <p> + * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. + * + * @param hbFreq Heartbeat frequency in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public void setHeartbeatFrequency(long hbFreq) { + this.hbFreq = hbFreq; + } + + /** + * @return Size of topology snapshots history. + */ + public long getTopHistorySize() { + return topHistSize; + } + + /** + * Sets size of topology snapshots history. Specified size should be greater than or equal to default size + * {@link #DFLT_TOP_HISTORY_SIZE}. + * + * @param topHistSize Size of topology snapshots history. + */ + @IgniteSpiConfiguration(optional = true) + public void setTopHistorySize(int topHistSize) { + if (topHistSize < DFLT_TOP_HISTORY_SIZE) { + U.warn(log, "Topology history size should be greater than or equal to default size. " + + "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + + ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); + + return; + } + + this.topHistSize = topHistSize; + } + + /** {@inheritDoc} */ + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + assert locNodeAttrs == null; + assert locNodeVer == null; + + if (log.isDebugEnabled()) { + log.debug("Node attributes to set: " + attrs); + log.debug("Node version to set: " + ver); + } + + locNodeAttrs = attrs; + locNodeVer = ver; + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onContextInitialized0(spiCtx); + + ipFinder.onSpiContextInitialized(spiCtx); + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + super.onContextDestroyed0(); + + ipFinder.onSpiContextDestroyed(); + } + + /** {@inheritDoc} */ + @Override public ClusterNode getLocalNode() { + return locNode; + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + this.lsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { + this.exchange = exchange; + } + + /** {@inheritDoc} */ + @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { + this.metricsProvider = metricsProvider; + } + + /** {@inheritDoc} */ + @Override public long getGridStartTime() { + assert gridStartTime != 0; + + return gridStartTime; + } + + /** + * @param sockAddr Remote address. + * @return Opened socket. + * @throws IOException If failed. + */ + protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + assert sockAddr != null; + + InetSocketAddress resolved = sockAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; + + InetAddress addr = resolved.getAddress(); + + assert addr != null; + + Socket sock = new Socket(); + + sock.bind(new InetSocketAddress(locHost, 0)); + + sock.setTcpNoDelay(true); + + sock.connect(resolved, (int)sockTimeout); + + writeToSocket(sock, U.GG_HEADER); + + return sock; + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param data Raw data to write. + * @throws IOException If IO failed or write timed out. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, byte[] data) throws IOException { + assert sock != null; + assert data != null; + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + IOException err = null; + + try { + OutputStream out = sock.getOutputStream(); + + out.write(data); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed or write timed out. + * @throws GridException If marshalling failed. + */ + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, GridException { + writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @param bout Byte array output stream. + * @throws IOException If IO failed or write timed out. + * @throws GridException If marshalling failed. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) + throws IOException, GridException { + assert sock != null; + assert msg != null; + assert bout != null; + + // Marshall message first to perform only write after. + marsh.marshal(msg, bout); + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + IOException err = null; + + try { + OutputStream out = sock.getOutputStream(); + + bout.writeTo(out); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Writes response to the socket. + * + * @param sock Socket. + * @param res Integer response. + * @throws IOException If IO failed or write timed out. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, int res) throws IOException { + assert sock != null; + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + OutputStream out = sock.getOutputStream(); + + IOException err = null; + + try { + out.write(res); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Reads message from the socket limiting read time. + * + * @param sock Socket. + * @param in Input stream (in case socket stream was wrapped). + * @param timeout Socket timeout for this operation. + * @return Message. + * @throws IOException If IO failed or read timed out. + * @throws GridException If unmarshalling failed. + */ + protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException { + assert sock != null; + + int oldTimeout = sock.getSoTimeout(); + + try { + sock.setSoTimeout((int)timeout); + + return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + } + catch (IOException | GridException e) { + if (X.hasCause(e, SocketTimeoutException.class)) + LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + + "in long GC pauses on remote node. Current timeout: " + timeout + '.'); + + throw e; + } + finally { + // Quietly restore timeout. + try { + sock.setSoTimeout(oldTimeout); + } + catch (SocketException ignored) { + // No-op. + } + } + } + + /** + * Reads message delivery receipt from the socket. + * + * @param sock Socket. + * @param timeout Socket timeout for this operation. + * @return Receipt. + * @throws IOException If IO failed or read timed out. + */ + protected int readReceipt(Socket sock, long timeout) throws IOException { + assert sock != null; + + int oldTimeout = sock.getSoTimeout(); + + try { + sock.setSoTimeout((int)timeout); + + int res = sock.getInputStream().read(); + + if (res == -1) + throw new EOFException(); + + return res; + } + catch (SocketTimeoutException e) { + LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + + "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + + "configuration property). Will retry to send message with increased timeout. " + + "Current timeout: " + timeout + '.'); + + stats.onAckTimeout(); + + throw e; + } + finally { + // Quietly restore timeout. + try { + sock.setSoTimeout(oldTimeout); + } + catch (SocketException ignored) { + // No-op. + } + } + } + + /** + * Resolves addresses registered in the IP finder, removes duplicates and local host + * address and returns the collection of. + * + * @return Resolved addresses without duplicates and local address (potentially + * empty but never null). + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { + List<InetSocketAddress> res = new ArrayList<>(); + + Collection<InetSocketAddress> addrs; + + // Get consistent addresses collection. + while (true) { + try { + addrs = registeredAddresses(); + + break; + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to get registered addresses from IP finder on start " + + "(retrying every 2000 ms)."); + } + + try { + U.sleep(2000); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + + for (InetSocketAddress addr : addrs) { + assert addr != null; + + try { + InetSocketAddress resolved = addr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; + + if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) + res.add(resolved); + } + catch (UnknownHostException ignored) { + LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); + + // Add address in any case. + res.add(addr); + } + } + + if (!res.isEmpty()) + Collections.shuffle(res); + + return res; + } + + /** + * Gets addresses registered in the IP finder, initializes addresses having no + * port (or 0 port) with {@link #DFLT_PORT}. + * + * @return Registered addresses. + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { + Collection<InetSocketAddress> res = new LinkedList<>(); + + for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { + if (addr.getPort() == 0) + addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) : + new InetSocketAddress(addr.getAddress(), DFLT_PORT); + + res.add(addr); + } + + return res; + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) { + assert msg != null; + + return new IgniteSpiException("Local node has the same ID as existing node in topology " + + "(fix configuration and restart local node) [localNode=" + locNode + + ", existingNode=" + msg.node() + ']'); + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) { + assert msg != null; + + return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" + + msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) { + assert msg != null; + + return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : + new IgniteSpiException(msg.error()); + } + + /** + * @param msg Message. + * @return Whether delivery of the message is ensured. + */ + protected boolean ensured(TcpDiscoveryAbstractMessage msg) { + return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; + } + + /** + * @param msg Failed message. + * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. + * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it + * and create separate message for failed version check with next major release. + */ + @Deprecated + private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) { + return msg.error().contains("versions are not compatible"); + } + + /** + * Handles sockets timeouts. + */ + protected class SocketTimeoutWorker extends IgniteSpiThread { + /** Time-based sorted set for timeout objects. */ + private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = + new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { + @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { + long time1 = o1.endTime(); + long time2 = o2.endTime(); + + long id1 = o1.id(); + long id2 = o2.id(); + + return time1 < time2 ? -1 : time1 > time2 ? 1 : + id1 < id2 ? -1 : id1 > id2 ? 1 : 0; + } + }); + + /** Mutex. */ + private final Object mux0 = new Object(); + + /** + * + */ + SocketTimeoutWorker() { + super(gridName, "tcp-disco-sock-timeout-worker", log); + + setPriority(threadPri); + } + + /** + * @param timeoutObj Timeout object to add. + */ + @SuppressWarnings({"NakedNotify"}) + public void addTimeoutObject(SocketTimeoutObject timeoutObj) { + assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; + + timeoutObjs.add(timeoutObj); + + if (timeoutObjs.firstx() == timeoutObj) { + synchronized (mux0) { + mux0.notifyAll(); + } + } + } + + /** + * @param timeoutObj Timeout object to remove. + */ + public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { + assert timeoutObj != null; + + timeoutObjs.remove(timeoutObj); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Socket timeout worker has been started."); + + while (!isInterrupted()) { + long now = U.currentTimeMillis(); + + for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { + SocketTimeoutObject timeoutObj = iter.next(); + + if (timeoutObj.endTime() <= now) { + iter.remove(); + + if (timeoutObj.onTimeout()) { + LT.warn(log, null, "Socket write has timed out (consider increasing " + + "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); + + stats.onSocketTimeout(); + } + } + else + break; + } + + synchronized (mux0) { + while (true) { + // Access of the first element must be inside of + // synchronization block, so we don't miss out + // on thread notification events sent from + // 'addTimeoutObject(..)' method. + SocketTimeoutObject first = timeoutObjs.firstx(); + + if (first != null) { + long waitTime = first.endTime() - U.currentTimeMillis(); + + if (waitTime > 0) + mux0.wait(waitTime); + else + break; + } + else + mux0.wait(5000); + } + } + } + } + } + + /** + * Socket timeout object. + */ + protected static class SocketTimeoutObject { + /** */ + private static final AtomicLong idGen = new AtomicLong(); + + /** */ + private final long id = idGen.incrementAndGet(); + + /** */ + private final Socket sock; + + /** */ + private final long endTime; + + /** */ + private final AtomicBoolean done = new AtomicBoolean(); + + /** + * @param sock Socket. + * @param endTime End time. + */ + SocketTimeoutObject(Socket sock, long endTime) { + assert sock != null; + assert endTime > 0; + + this.sock = sock; + this.endTime = endTime; + } + + /** + * @return {@code True} if object has not yet been processed. + */ + boolean cancel() { + return done.compareAndSet(false, true); + } + + /** + * @return {@code True} if object has not yet been canceled. + */ + boolean onTimeout() { + if (done.compareAndSet(false, true)) { + // Close socket - timeout occurred. + U.closeQuiet(sock); + + return true; + } + + return false; + } + + /** + * @return End time. + */ + long endTime() { + return endTime; + } + + /** + * @return ID. + */ + long id() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SocketTimeoutObject.class, this); + } + } + + /** + * Base class for message workers. + */ + protected abstract class MessageWorkerAdapter extends IgniteSpiThread { + /** Pre-allocated output stream (100K). */ + private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); + + /** Message queue. */ + private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); + + /** Backed interrupted flag. */ + private volatile boolean interrupted; + + /** + * @param name Thread name. + */ + protected MessageWorkerAdapter(String name) { + super(gridName, name, log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Message worker started [locNodeId=" + locNodeId + ']'); + + while (!isInterrupted()) { + TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + + if (msg == null) + continue; + + processMessage(msg); + } + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + interrupted = true; + + super.interrupt(); + } + + /** {@inheritDoc} */ + @Override public boolean isInterrupted() { + return interrupted || super.isInterrupted(); + } + + /** + * @return Current queue size. + */ + int queueSize() { + return queue.size(); + } + + /** + * Adds message to queue. + * + * @param msg Message to add. + */ + void addMessage(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + if (msg instanceof TcpDiscoveryHeartbeatMessage) + queue.addFirst(msg); + else + queue.add(msg); + + if (log.isDebugEnabled()) + log.debug("Message has been added to queue: " + msg); + } + + protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); + + /** + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed. + * @throws GridException If marshalling failed. + */ + protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + throws IOException, GridException { + bout.reset(); + + TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java new file mode 100644 index 0000000..5043d78 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java @@ -0,0 +1,267 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Management bean for {@link TcpDiscoverySpi}. + */ +public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { + /** + * Gets delay between heartbeat messages sent by coordinator. + * + * @return Time period in milliseconds. + */ + @IgniteMBeanDescription("Heartbeat frequency.") + public long getHeartbeatFrequency(); + + /** + * Gets current SPI state. + * + * @return Current SPI state. + */ + @IgniteMBeanDescription("SPI state.") + public String getSpiState(); + + /** + * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). + * + * @return IPFinder (string representation). + */ + @IgniteMBeanDescription("IP Finder.") + public String getIpFinderFormatted(); + + /** + * Gets number of connection attempts. + * + * @return Number of connection attempts. + */ + @IgniteMBeanDescription("Reconnect count.") + public int getReconnectCount(); + + /** + * Gets network timeout. + * + * @return Network timeout. + */ + @IgniteMBeanDescription("Network timeout.") + public long getNetworkTimeout(); + + /** + * Gets local TCP port SPI listens to. + * + * @return Local port range. + */ + @IgniteMBeanDescription("Local TCP port.") + public int getLocalPort(); + + /** + * Gets local TCP port range. + * + * @return Local port range. + */ + @IgniteMBeanDescription("Local TCP port range.") + public int getLocalPortRange(); + + /** + * Gets max heartbeats count node can miss without initiating status check. + * + * @return Max missed heartbeats. + */ + @IgniteMBeanDescription("Max missed heartbeats.") + public int getMaxMissedHeartbeats(); + + /** + * Gets max heartbeats count node can miss without failing client node. + * + * @return Max missed client heartbeats. + */ + @IgniteMBeanDescription("Max missed client heartbeats.") + public int getMaxMissedClientHeartbeats(); + + /** + * Gets thread priority. All threads within SPI will be started with it. + * + * @return Thread priority. + */ + @IgniteMBeanDescription("Threads priority.") + public int getThreadPriority(); + + /** + * Gets IP finder clean frequency. + * + * @return IP finder clean frequency. + */ + @IgniteMBeanDescription("IP finder clean frequency.") + public long getIpFinderCleanFrequency(); + + /** + * Gets statistics print frequency. + * + * @return Statistics print frequency in milliseconds. + */ + @IgniteMBeanDescription("Statistics print frequency.") + public long getStatisticsPrintFrequency(); + + /** + * Gets message worker queue current size. + * + * @return Message worker queue current size. + */ + @IgniteMBeanDescription("Message worker queue current size.") + public int getMessageWorkerQueueSize(); + + /** + * Gets joined nodes count. + * + * @return Nodes joined count. + */ + @IgniteMBeanDescription("Nodes joined count.") + public long getNodesJoined(); + + /** + * Gets left nodes count. + * + * @return Left nodes count. + */ + @IgniteMBeanDescription("Nodes left count.") + public long getNodesLeft(); + + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + @IgniteMBeanDescription("Nodes failed count.") + public long getNodesFailed(); + + /** + * Gets pending messages registered count. + * + * @return Pending messages registered count. + */ + @IgniteMBeanDescription("Pending messages registered.") + public long getPendingMessagesRegistered(); + + /** + * Gets pending messages discarded count. + * + * @return Pending messages registered count. + */ + @IgniteMBeanDescription("Pending messages discarded.") + public long getPendingMessagesDiscarded(); + + /** + * Gets avg message processing time. + * + * @return Avg message processing time. + */ + @IgniteMBeanDescription("Avg message processing time.") + public long getAvgMessageProcessingTime(); + + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + @IgniteMBeanDescription("Max message processing time.") + public long getMaxMessageProcessingTime(); + + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + @IgniteMBeanDescription("Total received messages count.") + public int getTotalReceivedMessages(); + + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getReceivedMessages(); + + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + @IgniteMBeanDescription("Total processed messages count.") + public int getTotalProcessedMessages(); + + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getProcessedMessages(); + + /** + * Gets time local node has been coordinator since. + * + * @return Time local node is coordinator since. + */ + @IgniteMBeanDescription("Local node is coordinator since.") + public long getCoordinatorSinceTimestamp(); + + /** + * Gets current coordinator. + * + * @return Gets current coordinator. + */ + @IgniteMBeanDescription("Coordinator node ID.") + @Nullable public UUID getCoordinator(); + + /** + * Gets message acknowledgement timeout. + * + * @return Message acknowledgement timeout. + */ + @IgniteMBeanDescription("Message acknowledgement timeout.") + public long getAckTimeout(); + + /** + * Gets maximum message acknowledgement timeout. + * + * @return Maximum message acknowledgement timeout. + */ + @IgniteMBeanDescription("Maximum message acknowledgement timeout.") + public long getMaxAckTimeout(); + + /** + * Gets socket timeout. + * + * @return Socket timeout. + */ + @IgniteMBeanDescription("Socket timeout.") + public long getSocketTimeout(); + + /** + * Gets join timeout. + * + * @return Join timeout. + */ + @IgniteMBeanDescription("Join timeout.") + public long getJoinTimeout(); + + /** + * Dumps debug info using configured logger. + */ + @IgniteMBeanDescription("Dump debug info.") + public void dumpDebugInfo(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java new file mode 100644 index 0000000..5658545 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -0,0 +1,443 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.product.*; +import org.gridgain.grid.kernal.*; +import org.apache.ignite.spi.discovery.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.gridgain.grid.kernal.GridNodeAttributes.*; + +/** + * Node for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. + * <p> + * <strong>This class is not intended for public use</strong> and has been made + * <tt>public</tt> due to certain limitations of Java technology. + */ +public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode, + Comparable<TcpDiscoveryNode>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Node ID. */ + private UUID id; + + /** Consistent ID. */ + private Object consistentId; + + /** Node attributes. */ + @GridToStringExclude + private Map<String, Object> attrs; + + /** Internal discovery addresses as strings. */ + @GridToStringInclude + private Collection<String> addrs; + + /** Internal discovery host names as strings. */ + private Collection<String> hostNames; + + /** */ + @GridToStringInclude + private Collection<InetSocketAddress> sockAddrs; + + /** */ + @GridToStringInclude + private int discPort; + + /** Node metrics. */ + @GridToStringExclude + private volatile ClusterNodeMetrics metrics; + + /** Node order in the topology. */ + private volatile long order; + + /** Node order in the topology (internal). */ + private volatile long intOrder; + + /** The most recent time when heartbeat message was received from the node. */ + @GridToStringExclude + private volatile long lastUpdateTime = U.currentTimeMillis(); + + /** Metrics provider (transient). */ + @GridToStringExclude + private DiscoveryMetricsProvider metricsProvider; + + /** Visible flag (transient). */ + @GridToStringExclude + private boolean visible; + + /** Grid local node flag (transient). */ + private boolean loc; + + /** Version. */ + private IgniteProductVersion ver; + + /** Alive check (used by clients). */ + @GridToStringExclude + private transient int aliveCheck; + + /** Client router node ID. */ + @GridToStringExclude + private UUID clientRouterNodeId; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNode() { + // No-op. + } + + /** + * Constructor. + * + * @param id Node Id. + * @param addrs Addresses. + * @param hostNames Host names. + * @param discPort Port. + * @param metricsProvider Metrics provider. + * @param ver Version. + */ + public TcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort, + DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) { + assert id != null; + assert !F.isEmpty(addrs); + assert metricsProvider != null; + assert ver != null; + + this.id = id; + this.addrs = addrs; + this.hostNames = hostNames; + this.discPort = discPort; + this.metricsProvider = metricsProvider; + this.ver = ver; + + consistentId = U.consistentId(addrs, discPort); + + metrics = metricsProvider.getMetrics(); + sockAddrs = U.toSocketAddresses(this, discPort); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistentId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T attribute(String name) { + // Even though discovery SPI removes this attribute after authentication, keep this check for safety. + if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) + return null; + + return (T)attrs.get(name); + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + // Even though discovery SPI removes this attribute after authentication, keep this check for safety. + return F.view(attrs, new IgnitePredicate<String>() { + @Override public boolean apply(String s) { + return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); + } + }); + } + + /** + * Sets node attributes. + * + * @param attrs Node attributes. + */ + public void setAttributes(Map<String, Object> attrs) { + this.attrs = U.sealMap(attrs); + } + + /** + * Gets node attributes without filtering. + * + * @return Node attributes without filtering. + */ + public Map<String, Object> getAttributes() { + return attrs; + } + + /** {@inheritDoc} */ + @Override public ClusterNodeMetrics metrics() { + if (metricsProvider != null) + metrics = metricsProvider.getMetrics(); + + return metrics; + } + + /** + * Sets node metrics. + * + * @param metrics Node metrics. + */ + public void setMetrics(ClusterNodeMetrics metrics) { + assert metrics != null; + + this.metrics = metrics; + } + + /** + * @return Internal order. + */ + public long internalOrder() { + return intOrder; + } + + /** + * @param intOrder Internal order of the node. + */ + public void internalOrder(long intOrder) { + assert intOrder > 0; + + this.intOrder = intOrder; + } + + /** + * @return Order. + */ + @Override public long order() { + return order; + } + + /** + * @param order Order of the node. + */ + public void order(long order) { + assert order >= 0 : "Order is invalid: " + this; + + this.order = order; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return ver; + } + + /** + * @param ver Version. + */ + public void version(IgniteProductVersion ver) { + assert ver != null; + + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return loc; + } + + /** + * @param loc Grid local node flag. + */ + public void local(boolean loc) { + this.loc = loc; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON)); + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + return hostNames; + } + + /** + * @return Discovery port. + */ + public int discoveryPort() { + return discPort; + } + + /** + * @return Addresses that could be used by discovery. + */ + public Collection<InetSocketAddress> socketAddresses() { + return sockAddrs; + } + + /** + * Gets node last update time. + * + * @return Time of the last heartbeat. + */ + public long lastUpdateTime() { + return lastUpdateTime; + } + + /** + * Sets node last update. + * + * @param lastUpdateTime Time of last metrics update. + */ + public void lastUpdateTime(long lastUpdateTime) { + assert lastUpdateTime > 0; + + this.lastUpdateTime = lastUpdateTime; + } + + /** + * Gets visible flag. + * + * @return {@code true} if node is in visible state. + */ + public boolean visible() { + return visible; + } + + /** + * Sets visible flag. + * + * @param visible {@code true} if node is in visible state. + */ + public void visible(boolean visible) { + this.visible = visible; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return clientRouterNodeId != null; + } + + /** + * Decrements alive check value and returns new one. + * + * @return Alive check value. + */ + public int decrementAliveCheck() { + assert isClient(); + + return --aliveCheck; + } + + /** + * @param aliveCheck Alive check value. + */ + public void aliveCheck(int aliveCheck) { + assert isClient(); + + this.aliveCheck = aliveCheck; + } + + /** + * @return Client router node ID. + */ + public UUID clientRouterNodeId() { + return clientRouterNodeId; + } + + /** + * @param clientRouterNodeId Client router node ID. + */ + public void clientRouterNodeId(UUID clientRouterNodeId) { + this.clientRouterNodeId = clientRouterNodeId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@Nullable TcpDiscoveryNode node) { + if (node == null) + return 1; + + if (internalOrder() == node.internalOrder()) + assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']'; + + return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 : + id().compareTo(node.id()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, id); + U.writeMap(out, attrs); + U.writeCollection(out, addrs); + U.writeCollection(out, hostNames); + out.writeInt(discPort); + + byte[] mtr = null; + + if (metrics != null) { + mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE]; + + DiscoveryMetricsHelper.serialize(mtr, 0, metrics); + } + + U.writeByteArray(out, mtr); + + out.writeLong(order); + out.writeLong(intOrder); + out.writeObject(ver); + U.writeUuid(out, clientRouterNodeId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readUuid(in); + + attrs = U.sealMap(U.<String, Object>readMap(in)); + addrs = U.readCollection(in); + hostNames = U.readCollection(in); + discPort = in.readInt(); + + sockAddrs = U.toSocketAddresses(this, discPort); + + consistentId = U.consistentId(addrs, discPort); + + byte[] mtr = U.readByteArray(in); + + if (mtr != null) + metrics = DiscoveryMetricsHelper.deserialize(mtr, 0); + + order = in.readLong(); + intOrder = in.readLong(); + ver = (IgniteProductVersion)in.readObject(); + clientRouterNodeId = U.readUuid(in); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return F.eqNodes(this, o); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java new file mode 100644 index 0000000..9c57f4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -0,0 +1,636 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.locks.*; + +/** + * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} + */ +public class TcpDiscoveryNodesRing { + /** Visible nodes filter. */ + private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { + return node.visible(); + } + }; + + /** Client nodes filter. */ + private static final PN CLIENT_NODES = new PN() { + @Override public boolean apply(ClusterNode node) { + return node.isClient(); + } + }; + + /** Local node. */ + private TcpDiscoveryNode locNode; + + /** All nodes in topology. */ + @GridToStringInclude + private NavigableSet<TcpDiscoveryNode> nodes = new TreeSet<>(); + + /** All started nodes. */ + @GridToStringExclude + private Map<UUID, TcpDiscoveryNode> nodesMap = new HashMap<>(); + + /** Current topology version */ + private long topVer; + + /** */ + private long nodeOrder; + + /** Lock. */ + @GridToStringExclude + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** + * Sets local node. + * + * @param locNode Local node. + */ + public void localNode(TcpDiscoveryNode locNode) { + assert locNode != null; + + rwLock.writeLock().lock(); + + try { + this.locNode = locNode; + + clear(); + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Gets all nodes in the topology. + * + * @return Collection of all nodes. + */ + public Collection<TcpDiscoveryNode> allNodes() { + return nodes(); + } + + /** + * Gets visible nodes in the topology. + * + * @return Collection of visible nodes. + */ + public Collection<TcpDiscoveryNode> visibleNodes() { + return nodes(VISIBLE_NODES); + } + + /** + * Gets remote nodes. + * + * @return Collection of remote nodes in grid. + */ + public Collection<TcpDiscoveryNode> remoteNodes() { + return nodes(F.remoteNodes(locNode.id())); + } + + /** + * Gets visible remote nodes in the topology. + * + * @return Collection of visible remote nodes. + */ + public Collection<TcpDiscoveryNode> visibleRemoteNodes() { + return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id())); + } + + /** + * @return Client nodes. + */ + public Collection<TcpDiscoveryNode> clientNodes() { + return nodes(CLIENT_NODES); + } + + /** + * Checks whether the topology has remote nodes in. + * + * @return {@code true} if the topology has remote nodes in. + */ + public boolean hasRemoteNodes() { + rwLock.readLock().lock(); + + try { + return nodes.size() > 1; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Adds node to topology, also initializes node last update time with current + * system time. + * + * @param node Node to add. + * @return {@code true} if such node was added and did not present previously in the topology. + */ + public boolean add(TcpDiscoveryNode node) { + assert node != null; + assert node.internalOrder() > 0; + + rwLock.writeLock().lock(); + + try { + if (nodesMap.containsKey(node.id())) + return false; + + assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " + + "[ring=" + this + ", node=" + node + ']'; + + nodesMap.put(node.id(), node); + + nodes = new TreeSet<>(nodes); + + node.lastUpdateTime(U.currentTimeMillis()); + + nodes.add(node); + + nodeOrder = node.internalOrder(); + } + finally { + rwLock.writeLock().unlock(); + } + + return true; + } + + /** + * @return Max internal order. + */ + public long maxInternalOrder() { + rwLock.readLock().lock(); + + try { + TcpDiscoveryNode last = nodes.last(); + + return last != null ? last.internalOrder() : -1; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Restores topology from parameters values. + * <p> + * This method is called when new node receives topology from coordinator. + * In this case all nodes received are remote for local. + * <p> + * Also initializes nodes last update time with current system time. + * + * @param nodes List of remote nodes. + * @param topVer Topology version. + */ + public void restoreTopology(Iterable<TcpDiscoveryNode> nodes, long topVer) { + assert !F.isEmpty(nodes); + assert topVer > 0; + + rwLock.writeLock().lock(); + + try { + locNode.internalOrder(topVer); + + clear(); + + boolean firstAdd = true; + + for (TcpDiscoveryNode node : nodes) { + if (nodesMap.containsKey(node.id())) + continue; + + nodesMap.put(node.id(), node); + + if (firstAdd) { + this.nodes = new TreeSet<>(this.nodes); + + firstAdd = false; + } + + node.lastUpdateTime(U.currentTimeMillis()); + + this.nodes.add(node); + } + + nodeOrder = topVer; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Finds node by ID. + * + * @param nodeId Node id to find. + * @return Node with ID provided or {@code null} if not found. + */ + @Nullable public TcpDiscoveryNode node(UUID nodeId) { + assert nodeId != null; + + rwLock.readLock().lock(); + + try { + return nodesMap.get(nodeId); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Removes node from the topology. + * + * @param nodeId ID of the node to remove. + * @return {@code true} if node was removed. + */ + @Nullable public TcpDiscoveryNode removeNode(UUID nodeId) { + assert nodeId != null; + assert !locNode.id().equals(nodeId); + + rwLock.writeLock().lock(); + + try { + TcpDiscoveryNode rmv = nodesMap.remove(nodeId); + + if (rmv != null) { + nodes = new TreeSet<>(nodes); + + nodes.remove(rmv); + } + + return rmv; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Removes nodes from the topology. + * + * @param nodeIds IDs of the nodes to remove. + * @return Collection of removed nodes. + */ + public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) { + assert !F.isEmpty(nodeIds); + + rwLock.writeLock().lock(); + + try { + boolean firstRmv = true; + + Collection<TcpDiscoveryNode> res = null; + + for (UUID id : nodeIds) { + TcpDiscoveryNode rmv = nodesMap.remove(id); + + if (rmv != null) { + if (firstRmv) { + nodes = new TreeSet<>(nodes); + + res = new ArrayList<>(nodeIds.size()); + + firstRmv = false; + } + + nodes.remove(rmv); + + res.add(rmv); + } + } + + return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Removes all remote nodes, leaves only local node. + * <p> + * This should be called when SPI should be disconnected from topology and + * reconnected back after. + */ + public void clear() { + rwLock.writeLock().lock(); + + try { + nodes = new TreeSet<>(); + + if (locNode != null) + nodes.add(locNode); + + nodesMap = new HashMap<>(); + + if (locNode != null) + nodesMap.put(locNode.id(), locNode); + + nodeOrder = 0; + + topVer = 0; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Finds coordinator in the topology. + * + * @return Coordinator node that gives versions to topology (node with the smallest order). + */ + @Nullable public TcpDiscoveryNode coordinator() { + rwLock.readLock().lock(); + + try { + if (F.isEmpty(nodes)) + return null; + + return coordinator(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds coordinator in the topology filtering excluded nodes from the search. + * <p> + * This may be used when handling current coordinator leave or failure. + * + * @param excluded Nodes to exclude from the search (optional). + * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded. + */ + @Nullable public TcpDiscoveryNode coordinator(@Nullable Collection<TcpDiscoveryNode> excluded) { + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (F.isEmpty(filtered)) + return null; + + return Collections.min(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds next node in the topology. + * + * @return Next node. + */ + @Nullable public TcpDiscoveryNode nextNode() { + rwLock.readLock().lock(); + + try { + if (nodes.size() < 2) + return null; + + return nextNode(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds next node in the topology filtering excluded nodes from search. + * <p> + * This may be used when detecting and handling nodes failure. + * + * @param excluded Nodes to exclude from the search (optional). If provided, + * cannot contain local node. + * @return Next node or {@code null} if all nodes were filtered out or + * topology contains less than two nodes. + */ + @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) { + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (filtered.size() < 2) + return null; + + Iterator<TcpDiscoveryNode> iter = filtered.iterator(); + + while (iter.hasNext()) { + TcpDiscoveryNode node = iter.next(); + + if (locNode.equals(node)) + break; + } + + return iter.hasNext() ? iter.next() : F.first(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds previous node in the topology. + * + * @return Previous node. + */ + @Nullable public TcpDiscoveryNode previousNode() { + rwLock.readLock().lock(); + + try { + if (nodes.size() < 2) + return null; + + return previousNode(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds previous node in the topology filtering excluded nodes from search. + * + * @param excluded Nodes to exclude from the search (optional). If provided, + * cannot contain local node. + * @return Previous node or {@code null} if all nodes were filtered out or + * topology contains less than two nodes. + */ + @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) { + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (filtered.size() < 2) + return null; + + Iterator<TcpDiscoveryNode> iter = filtered.iterator(); + + while (iter.hasNext()) { + TcpDiscoveryNode node = iter.next(); + + if (locNode.equals(node)) + break; + } + + return iter.hasNext() ? iter.next() : F.first(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Gets current topology version. + * + * @return Current topology version. + */ + public long topologyVersion() { + rwLock.readLock().lock(); + + try { + return topVer; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Sets new topology version. + * + * @param topVer New topology version (should be greater than current, otherwise no-op). + * @return {@code True} if topology has been changed. + */ + public boolean topologyVersion(long topVer) { + rwLock.writeLock().lock(); + + try { + if (this.topVer < topVer) { + this.topVer = topVer; + + return true; + } + + U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']'); + + return false; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Increments topology version and gets new value. + * + * @return Topology version (incremented). + */ + public long incrementTopologyVersion() { + rwLock.writeLock().lock(); + + try { + return ++topVer; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Increments topology version and gets new value. + * + * @return Topology version (incremented). + */ + public long nextNodeOrder() { + rwLock.writeLock().lock(); + + try { + if (nodeOrder == 0) { + TcpDiscoveryNode last = nodes.last(); + + assert last != null; + + nodeOrder = last.internalOrder(); + } + + return ++nodeOrder; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * @param p Filters. + * @return Unmodifiable collection of nodes. + */ + private Collection<TcpDiscoveryNode> nodes(IgnitePredicate<? super TcpDiscoveryNode>... p) { + rwLock.readLock().lock(); + + try { + List<TcpDiscoveryNode> list = U.arrayList(nodes, p); + + return Collections.unmodifiableCollection(list); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Gets server nodes from topology. + * + * @param excluded Nodes to exclude from the search (optional). + * @return Collection of server nodes. + */ + private Collection<TcpDiscoveryNode> serverNodes(@Nullable final Collection<TcpDiscoveryNode> excluded) { + final boolean excludedEmpty = F.isEmpty(excluded); + + return F.view(nodes, new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { + return !node.isClient() && (excludedEmpty || !excluded.contains(node)); + } + }); + } + + /** {@inheritDoc} */ + @Override public String toString() { + rwLock.readLock().lock(); + + try { + return S.toString(TcpDiscoveryNodesRing.class, this); + } + finally { + rwLock.readLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java new file mode 100644 index 0000000..693ec41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java @@ -0,0 +1,45 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +/** + * State of local node {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public enum TcpDiscoverySpiState { + /** */ + DISCONNECTED, + + /** */ + CONNECTING, + + /** */ + CONNECTED, + + /** */ + DISCONNECTING, + + /** */ + STOPPING, + + /** */ + LEFT, + + /** */ + DUPLICATE_ID, + + /** */ + AUTH_FAILED, + + /** */ + CHECK_FAILED, + + /** */ + LOOPBACK_PROBLEM +}