http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java new file mode 100644 index 0000000..2945352 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -0,0 +1,997 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.product.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.spi.discovery.*; +import org.gridgain.grid.spi.discovery.tcp.internal.*; +import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; +import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*; +import org.gridgain.grid.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.io.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.gridgain.grid.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; + +/** + * Base class for TCP discovery SPIs. + */ +abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { + /** Default port to listen (value is <tt>47500</tt>). */ + public static final int DFLT_PORT = 47500; + + /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */ + public static final long DFLT_SOCK_TIMEOUT = 2000; + + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 5000; + + /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 5000; + + /** Default value for thread priority (value is <tt>10</tt>). */ + public static final int DFLT_THREAD_PRI = 10; + + /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */ + public static final long DFLT_HEARTBEAT_FREQ = 2000; + + /** Default size of topology snapshots history. */ + public static final int DFLT_TOP_HISTORY_SIZE = 1000; + + /** Response OK. */ + protected static final int RES_OK = 1; + + /** Response CONTINUE JOIN. */ + protected static final int RES_CONTINUE_JOIN = 100; + + /** Response WAIT. */ + protected static final int RES_WAIT = 200; + + /** Local address. */ + protected String locAddr; + + /** IP finder. */ + protected GridTcpDiscoveryIpFinder ipFinder; + + /** Socket operations timeout. */ + protected long sockTimeout = DFLT_SOCK_TIMEOUT; + + /** Message acknowledgement timeout. */ + protected long ackTimeout = DFLT_ACK_TIMEOUT; + + /** Network timeout. */ + protected long netTimeout = DFLT_NETWORK_TIMEOUT; + + /** Thread priority for all threads started by SPI. */ + protected int threadPri = DFLT_THREAD_PRI; + + /** Heartbeat messages issuing frequency. */ + protected long hbFreq = DFLT_HEARTBEAT_FREQ; + + /** Size of topology snapshots history. */ + protected int topHistSize = DFLT_TOP_HISTORY_SIZE; + + /** Grid discovery listener. */ + protected volatile DiscoverySpiListener lsnr; + + /** Data exchange. */ + protected DiscoverySpiDataExchange exchange; + + /** Metrics provider. */ + protected DiscoveryMetricsProvider metricsProvider; + + /** Local node attributes. */ + protected Map<String, Object> locNodeAttrs; + + /** Local node version. */ + protected IgniteProductVersion locNodeVer; + + /** Local node. */ + protected TcpDiscoveryNode locNode; + + /** Local host. */ + protected InetAddress locHost; + + /** Internal and external addresses of local node. */ + protected Collection<InetSocketAddress> locNodeAddrs; + + /** Socket timeout worker. */ + protected SocketTimeoutWorker sockTimeoutWorker; + + /** Discovery state. */ + protected TcpDiscoverySpiState spiState = DISCONNECTED; + + /** Start time of the very first grid node. */ + protected volatile long gridStartTime; + + /** Marshaller. */ + protected final IgniteMarshaller marsh = new IgniteJdkMarshaller(); + + /** Statistics. */ + protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); + + /** Local node ID. */ + @IgniteLocalNodeIdResource + protected UUID locNodeId; + + /** Name of the grid. */ + @IgniteNameResource + protected String gridName; + + /** Logger. */ + @IgniteLoggerResource + protected IgniteLogger log; + + /** + * Sets local host IP address that discovery SPI uses. + * <p> + * If not provided, by default a first found non-loopback address + * will be used. If there is no non-loopback address available, + * then {@link InetAddress#getLocalHost()} will be used. + * + * @param locAddr IP address. + */ + @IgniteSpiConfiguration(optional = true) + @IgniteLocalHostResource + public void setLocalAddress(String locAddr) { + // Injection should not override value already set by Spring or user. + if (this.locAddr == null) + this.locAddr = locAddr; + } + + /** + * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. + * + * @return local address. + */ + public String getLocalAddress() { + return locAddr; + } + + /** + * Gets IP finder for IP addresses sharing and storing. + * + * @return IP finder for IP addresses sharing and storing. + */ + public GridTcpDiscoveryIpFinder getIpFinder() { + return ipFinder; + } + + /** + * Sets IP finder for IP addresses sharing and storing. + * <p> + * If not provided {@link GridTcpDiscoveryMulticastIpFinder} will be used by default. + * + * @param ipFinder IP finder. + */ + @IgniteSpiConfiguration(optional = true) + public void setIpFinder(GridTcpDiscoveryIpFinder ipFinder) { + this.ipFinder = ipFinder; + } + + /** + * Sets socket operations timeout. This timeout is used to limit connection time and + * write-to-socket time. + * <p> + * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value + * significantly greater than the default (e.g. to {@code 30000}). + * <p> + * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. + * + * @param sockTimeout Socket connection timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setSocketTimeout(long sockTimeout) { + this.sockTimeout = sockTimeout; + } + + /** + * Sets timeout for receiving acknowledgement for sent message. + * <p> + * If acknowledgement is not received within this timeout, sending is considered as failed + * and SPI tries to repeat message sending. + * <p> + * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. + * + * @param ackTimeout Acknowledgement timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + } + + /** + * Sets maximum network timeout to use for network operations. + * <p> + * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. + * + * @param netTimeout Network timeout. + */ + @IgniteSpiConfiguration(optional = true) + public void setNetworkTimeout(long netTimeout) { + this.netTimeout = netTimeout; + } + + /** + * Sets thread priority. All threads within SPI will be started with it. + * <p> + * If not provided, default value is {@link #DFLT_THREAD_PRI} + * + * @param threadPri Thread priority. + */ + @IgniteSpiConfiguration(optional = true) + public void setThreadPriority(int threadPri) { + this.threadPri = threadPri; + } + + /** + * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages + * in configurable time interval to other nodes to notify them about its state. + * <p> + * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. + * + * @param hbFreq Heartbeat frequency in milliseconds. + */ + @IgniteSpiConfiguration(optional = true) + public void setHeartbeatFrequency(long hbFreq) { + this.hbFreq = hbFreq; + } + + /** + * @return Size of topology snapshots history. + */ + public long getTopHistorySize() { + return topHistSize; + } + + /** + * Sets size of topology snapshots history. Specified size should be greater than or equal to default size + * {@link #DFLT_TOP_HISTORY_SIZE}. + * + * @param topHistSize Size of topology snapshots history. + */ + @IgniteSpiConfiguration(optional = true) + public void setTopHistorySize(int topHistSize) { + if (topHistSize < DFLT_TOP_HISTORY_SIZE) { + U.warn(log, "Topology history size should be greater than or equal to default size. " + + "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + + ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); + + return; + } + + this.topHistSize = topHistSize; + } + + /** {@inheritDoc} */ + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + assert locNodeAttrs == null; + assert locNodeVer == null; + + if (log.isDebugEnabled()) { + log.debug("Node attributes to set: " + attrs); + log.debug("Node version to set: " + ver); + } + + locNodeAttrs = attrs; + locNodeVer = ver; + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onContextInitialized0(spiCtx); + + ipFinder.onSpiContextInitialized(spiCtx); + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + super.onContextDestroyed0(); + + ipFinder.onSpiContextDestroyed(); + } + + /** {@inheritDoc} */ + @Override public ClusterNode getLocalNode() { + return locNode; + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + this.lsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { + this.exchange = exchange; + } + + /** {@inheritDoc} */ + @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { + this.metricsProvider = metricsProvider; + } + + /** {@inheritDoc} */ + @Override public long getGridStartTime() { + assert gridStartTime != 0; + + return gridStartTime; + } + + /** + * @param sockAddr Remote address. + * @return Opened socket. + * @throws IOException If failed. + */ + protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + assert sockAddr != null; + + InetSocketAddress resolved = sockAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; + + InetAddress addr = resolved.getAddress(); + + assert addr != null; + + Socket sock = new Socket(); + + sock.bind(new InetSocketAddress(locHost, 0)); + + sock.setTcpNoDelay(true); + + sock.connect(resolved, (int)sockTimeout); + + writeToSocket(sock, U.GG_HEADER); + + return sock; + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param data Raw data to write. + * @throws IOException If IO failed or write timed out. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, byte[] data) throws IOException { + assert sock != null; + assert data != null; + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + IOException err = null; + + try { + OutputStream out = sock.getOutputStream(); + + out.write(data); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed or write timed out. + * @throws GridException If marshalling failed. + */ + protected void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg) throws IOException, GridException { + writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. + } + + /** + * Writes message to the socket. + * + * @param sock Socket. + * @param msg Message. + * @param bout Byte array output stream. + * @throws IOException If IO failed or write timed out. + * @throws GridException If marshalling failed. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) + throws IOException, GridException { + assert sock != null; + assert msg != null; + assert bout != null; + + // Marshall message first to perform only write after. + marsh.marshal(msg, bout); + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + IOException err = null; + + try { + OutputStream out = sock.getOutputStream(); + + bout.writeTo(out); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Writes response to the socket. + * + * @param sock Socket. + * @param res Integer response. + * @throws IOException If IO failed or write timed out. + */ + @SuppressWarnings("ThrowFromFinallyBlock") + protected void writeToSocket(Socket sock, int res) throws IOException { + assert sock != null; + + SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); + + sockTimeoutWorker.addTimeoutObject(obj); + + OutputStream out = sock.getOutputStream(); + + IOException err = null; + + try { + out.write(res); + + out.flush(); + } + catch (IOException e) { + err = e; + } + finally { + boolean cancelled = obj.cancel(); + + if (cancelled) + sockTimeoutWorker.removeTimeoutObject(obj); + + // Throw original exception. + if (err != null) + throw err; + + if (!cancelled) + throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); + } + } + + /** + * Reads message from the socket limiting read time. + * + * @param sock Socket. + * @param in Input stream (in case socket stream was wrapped). + * @param timeout Socket timeout for this operation. + * @return Message. + * @throws IOException If IO failed or read timed out. + * @throws GridException If unmarshalling failed. + */ + protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException { + assert sock != null; + + int oldTimeout = sock.getSoTimeout(); + + try { + sock.setSoTimeout((int)timeout); + + return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + } + catch (IOException | GridException e) { + if (X.hasCause(e, SocketTimeoutException.class)) + LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + + "in long GC pauses on remote node. Current timeout: " + timeout + '.'); + + throw e; + } + finally { + // Quietly restore timeout. + try { + sock.setSoTimeout(oldTimeout); + } + catch (SocketException ignored) { + // No-op. + } + } + } + + /** + * Reads message delivery receipt from the socket. + * + * @param sock Socket. + * @param timeout Socket timeout for this operation. + * @return Receipt. + * @throws IOException If IO failed or read timed out. + */ + protected int readReceipt(Socket sock, long timeout) throws IOException { + assert sock != null; + + int oldTimeout = sock.getSoTimeout(); + + try { + sock.setSoTimeout((int)timeout); + + int res = sock.getInputStream().read(); + + if (res == -1) + throw new EOFException(); + + return res; + } + catch (SocketTimeoutException e) { + LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + + "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + + "configuration property). Will retry to send message with increased timeout. " + + "Current timeout: " + timeout + '.'); + + stats.onAckTimeout(); + + throw e; + } + finally { + // Quietly restore timeout. + try { + sock.setSoTimeout(oldTimeout); + } + catch (SocketException ignored) { + // No-op. + } + } + } + + /** + * Resolves addresses registered in the IP finder, removes duplicates and local host + * address and returns the collection of. + * + * @return Resolved addresses without duplicates and local address (potentially + * empty but never null). + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { + List<InetSocketAddress> res = new ArrayList<>(); + + Collection<InetSocketAddress> addrs; + + // Get consistent addresses collection. + while (true) { + try { + addrs = registeredAddresses(); + + break; + } + catch (IgniteSpiException e) { + LT.error(log, e, "Failed to get registered addresses from IP finder on start " + + "(retrying every 2000 ms)."); + } + + try { + U.sleep(2000); + } + catch (GridInterruptedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + + for (InetSocketAddress addr : addrs) { + assert addr != null; + + try { + InetSocketAddress resolved = addr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; + + if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) + res.add(resolved); + } + catch (UnknownHostException ignored) { + LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); + + // Add address in any case. + res.add(addr); + } + } + + if (!res.isEmpty()) + Collections.shuffle(res); + + return res; + } + + /** + * Gets addresses registered in the IP finder, initializes addresses having no + * port (or 0 port) with {@link #DFLT_PORT}. + * + * @return Registered addresses. + * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. + */ + protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { + Collection<InetSocketAddress> res = new LinkedList<>(); + + for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { + if (addr.getPort() == 0) + addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) : + new InetSocketAddress(addr.getAddress(), DFLT_PORT); + + res.add(addr); + } + + return res; + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException duplicateIdError(GridTcpDiscoveryDuplicateIdMessage msg) { + assert msg != null; + + return new IgniteSpiException("Local node has the same ID as existing node in topology " + + "(fix configuration and restart local node) [localNode=" + locNode + + ", existingNode=" + msg.node() + ']'); + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException authenticationFailedError(GridTcpDiscoveryAuthFailedMessage msg) { + assert msg != null; + + return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" + + msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); + } + + /** + * @param msg Message. + * @return Error. + */ + protected IgniteSpiException checkFailedError(GridTcpDiscoveryCheckFailedMessage msg) { + assert msg != null; + + return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : + new IgniteSpiException(msg.error()); + } + + /** + * @param msg Message. + * @return Whether delivery of the message is ensured. + */ + protected boolean ensured(GridTcpDiscoveryAbstractMessage msg) { + return U.getAnnotation(msg.getClass(), GridTcpDiscoveryEnsureDelivery.class) != null; + } + + /** + * @param msg Failed message. + * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. + * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it + * and create separate message for failed version check with next major release. + */ + @Deprecated + private static boolean versionCheckFailed(GridTcpDiscoveryCheckFailedMessage msg) { + return msg.error().contains("versions are not compatible"); + } + + /** + * Handles sockets timeouts. + */ + protected class SocketTimeoutWorker extends IgniteSpiThread { + /** Time-based sorted set for timeout objects. */ + private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = + new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { + @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { + long time1 = o1.endTime(); + long time2 = o2.endTime(); + + long id1 = o1.id(); + long id2 = o2.id(); + + return time1 < time2 ? -1 : time1 > time2 ? 1 : + id1 < id2 ? -1 : id1 > id2 ? 1 : 0; + } + }); + + /** Mutex. */ + private final Object mux0 = new Object(); + + /** + * + */ + SocketTimeoutWorker() { + super(gridName, "tcp-disco-sock-timeout-worker", log); + + setPriority(threadPri); + } + + /** + * @param timeoutObj Timeout object to add. + */ + @SuppressWarnings({"NakedNotify"}) + public void addTimeoutObject(SocketTimeoutObject timeoutObj) { + assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; + + timeoutObjs.add(timeoutObj); + + if (timeoutObjs.firstx() == timeoutObj) { + synchronized (mux0) { + mux0.notifyAll(); + } + } + } + + /** + * @param timeoutObj Timeout object to remove. + */ + public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { + assert timeoutObj != null; + + timeoutObjs.remove(timeoutObj); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Socket timeout worker has been started."); + + while (!isInterrupted()) { + long now = U.currentTimeMillis(); + + for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { + SocketTimeoutObject timeoutObj = iter.next(); + + if (timeoutObj.endTime() <= now) { + iter.remove(); + + if (timeoutObj.onTimeout()) { + LT.warn(log, null, "Socket write has timed out (consider increasing " + + "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); + + stats.onSocketTimeout(); + } + } + else + break; + } + + synchronized (mux0) { + while (true) { + // Access of the first element must be inside of + // synchronization block, so we don't miss out + // on thread notification events sent from + // 'addTimeoutObject(..)' method. + SocketTimeoutObject first = timeoutObjs.firstx(); + + if (first != null) { + long waitTime = first.endTime() - U.currentTimeMillis(); + + if (waitTime > 0) + mux0.wait(waitTime); + else + break; + } + else + mux0.wait(5000); + } + } + } + } + } + + /** + * Socket timeout object. + */ + protected static class SocketTimeoutObject { + /** */ + private static final AtomicLong idGen = new AtomicLong(); + + /** */ + private final long id = idGen.incrementAndGet(); + + /** */ + private final Socket sock; + + /** */ + private final long endTime; + + /** */ + private final AtomicBoolean done = new AtomicBoolean(); + + /** + * @param sock Socket. + * @param endTime End time. + */ + SocketTimeoutObject(Socket sock, long endTime) { + assert sock != null; + assert endTime > 0; + + this.sock = sock; + this.endTime = endTime; + } + + /** + * @return {@code True} if object has not yet been processed. + */ + boolean cancel() { + return done.compareAndSet(false, true); + } + + /** + * @return {@code True} if object has not yet been canceled. + */ + boolean onTimeout() { + if (done.compareAndSet(false, true)) { + // Close socket - timeout occurred. + U.closeQuiet(sock); + + return true; + } + + return false; + } + + /** + * @return End time. + */ + long endTime() { + return endTime; + } + + /** + * @return ID. + */ + long id() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SocketTimeoutObject.class, this); + } + } + + /** + * Base class for message workers. + */ + protected abstract class MessageWorkerAdapter extends IgniteSpiThread { + /** Pre-allocated output stream (100K). */ + private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); + + /** Message queue. */ + private final BlockingDeque<GridTcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); + + /** Backed interrupted flag. */ + private volatile boolean interrupted; + + /** + * @param name Thread name. + */ + protected MessageWorkerAdapter(String name) { + super(gridName, name, log); + + setPriority(threadPri); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Message worker started [locNodeId=" + locNodeId + ']'); + + while (!isInterrupted()) { + GridTcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); + + if (msg == null) + continue; + + processMessage(msg); + } + } + + /** {@inheritDoc} */ + @Override public void interrupt() { + interrupted = true; + + super.interrupt(); + } + + /** {@inheritDoc} */ + @Override public boolean isInterrupted() { + return interrupted || super.isInterrupted(); + } + + /** + * @return Current queue size. + */ + int queueSize() { + return queue.size(); + } + + /** + * Adds message to queue. + * + * @param msg Message to add. + */ + void addMessage(GridTcpDiscoveryAbstractMessage msg) { + assert msg != null; + + if (msg instanceof GridTcpDiscoveryHeartbeatMessage) + queue.addFirst(msg); + else + queue.add(msg); + + if (log.isDebugEnabled()) + log.debug("Message has been added to queue: " + msg); + } + + protected abstract void processMessage(GridTcpDiscoveryAbstractMessage msg); + + /** + * @param sock Socket. + * @param msg Message. + * @throws IOException If IO failed. + * @throws GridException If marshalling failed. + */ + protected final void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg) + throws IOException, GridException { + bout.reset(); + + TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java new file mode 100644 index 0000000..0f9212d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java @@ -0,0 +1,268 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Management bean for {@link TcpDiscoverySpi}. + */ +public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { + /** + * Gets delay between heartbeat messages sent by coordinator. + * + * @return Time period in milliseconds. + */ + @IgniteMBeanDescription("Heartbeat frequency.") + public long getHeartbeatFrequency(); + + /** + * Gets current SPI state. + * + * @return Current SPI state. + */ + @IgniteMBeanDescription("SPI state.") + public String getSpiState(); + + /** + * Gets {@link GridTcpDiscoveryIpFinder} (string representation). + * + * @return IPFinder (string representation). + */ + @IgniteMBeanDescription("IP Finder.") + public String getIpFinderFormatted(); + + /** + * Gets number of connection attempts. + * + * @return Number of connection attempts. + */ + @IgniteMBeanDescription("Reconnect count.") + public int getReconnectCount(); + + /** + * Gets network timeout. + * + * @return Network timeout. + */ + @IgniteMBeanDescription("Network timeout.") + public long getNetworkTimeout(); + + /** + * Gets local TCP port SPI listens to. + * + * @return Local port range. + */ + @IgniteMBeanDescription("Local TCP port.") + public int getLocalPort(); + + /** + * Gets local TCP port range. + * + * @return Local port range. + */ + @IgniteMBeanDescription("Local TCP port range.") + public int getLocalPortRange(); + + /** + * Gets max heartbeats count node can miss without initiating status check. + * + * @return Max missed heartbeats. + */ + @IgniteMBeanDescription("Max missed heartbeats.") + public int getMaxMissedHeartbeats(); + + /** + * Gets max heartbeats count node can miss without failing client node. + * + * @return Max missed client heartbeats. + */ + @IgniteMBeanDescription("Max missed client heartbeats.") + public int getMaxMissedClientHeartbeats(); + + /** + * Gets thread priority. All threads within SPI will be started with it. + * + * @return Thread priority. + */ + @IgniteMBeanDescription("Threads priority.") + public int getThreadPriority(); + + /** + * Gets IP finder clean frequency. + * + * @return IP finder clean frequency. + */ + @IgniteMBeanDescription("IP finder clean frequency.") + public long getIpFinderCleanFrequency(); + + /** + * Gets statistics print frequency. + * + * @return Statistics print frequency in milliseconds. + */ + @IgniteMBeanDescription("Statistics print frequency.") + public long getStatisticsPrintFrequency(); + + /** + * Gets message worker queue current size. + * + * @return Message worker queue current size. + */ + @IgniteMBeanDescription("Message worker queue current size.") + public int getMessageWorkerQueueSize(); + + /** + * Gets joined nodes count. + * + * @return Nodes joined count. + */ + @IgniteMBeanDescription("Nodes joined count.") + public long getNodesJoined(); + + /** + * Gets left nodes count. + * + * @return Left nodes count. + */ + @IgniteMBeanDescription("Nodes left count.") + public long getNodesLeft(); + + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + @IgniteMBeanDescription("Nodes failed count.") + public long getNodesFailed(); + + /** + * Gets pending messages registered count. + * + * @return Pending messages registered count. + */ + @IgniteMBeanDescription("Pending messages registered.") + public long getPendingMessagesRegistered(); + + /** + * Gets pending messages discarded count. + * + * @return Pending messages registered count. + */ + @IgniteMBeanDescription("Pending messages discarded.") + public long getPendingMessagesDiscarded(); + + /** + * Gets avg message processing time. + * + * @return Avg message processing time. + */ + @IgniteMBeanDescription("Avg message processing time.") + public long getAvgMessageProcessingTime(); + + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + @IgniteMBeanDescription("Max message processing time.") + public long getMaxMessageProcessingTime(); + + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + @IgniteMBeanDescription("Total received messages count.") + public int getTotalReceivedMessages(); + + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getReceivedMessages(); + + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + @IgniteMBeanDescription("Total processed messages count.") + public int getTotalProcessedMessages(); + + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + @IgniteMBeanDescription("Received messages by type.") + public Map<String, Integer> getProcessedMessages(); + + /** + * Gets time local node has been coordinator since. + * + * @return Time local node is coordinator since. + */ + @IgniteMBeanDescription("Local node is coordinator since.") + public long getCoordinatorSinceTimestamp(); + + /** + * Gets current coordinator. + * + * @return Gets current coordinator. + */ + @IgniteMBeanDescription("Coordinator node ID.") + @Nullable public UUID getCoordinator(); + + /** + * Gets message acknowledgement timeout. + * + * @return Message acknowledgement timeout. + */ + @IgniteMBeanDescription("Message acknowledgement timeout.") + public long getAckTimeout(); + + /** + * Gets maximum message acknowledgement timeout. + * + * @return Maximum message acknowledgement timeout. + */ + @IgniteMBeanDescription("Maximum message acknowledgement timeout.") + public long getMaxAckTimeout(); + + /** + * Gets socket timeout. + * + * @return Socket timeout. + */ + @IgniteMBeanDescription("Socket timeout.") + public long getSocketTimeout(); + + /** + * Gets join timeout. + * + * @return Join timeout. + */ + @IgniteMBeanDescription("Join timeout.") + public long getJoinTimeout(); + + /** + * Dumps debug info using configured logger. + */ + @IgniteMBeanDescription("Dump debug info.") + public void dumpDebugInfo(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNode.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNode.java deleted file mode 100644 index 3469973..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNode.java +++ /dev/null @@ -1,443 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.product.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.spi.discovery.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.gridgain.grid.kernal.GridNodeAttributes.*; - -/** - * Node for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - * <p> - * <strong>This class is not intended for public use</strong> and has been made - * <tt>public</tt> due to certain limitations of Java technology. - */ -public class GridTcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode, - Comparable<GridTcpDiscoveryNode>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private UUID id; - - /** Consistent ID. */ - private Object consistentId; - - /** Node attributes. */ - @GridToStringExclude - private Map<String, Object> attrs; - - /** Internal discovery addresses as strings. */ - @GridToStringInclude - private Collection<String> addrs; - - /** Internal discovery host names as strings. */ - private Collection<String> hostNames; - - /** */ - @GridToStringInclude - private Collection<InetSocketAddress> sockAddrs; - - /** */ - @GridToStringInclude - private int discPort; - - /** Node metrics. */ - @GridToStringExclude - private volatile ClusterNodeMetrics metrics; - - /** Node order in the topology. */ - private volatile long order; - - /** Node order in the topology (internal). */ - private volatile long intOrder; - - /** The most recent time when heartbeat message was received from the node. */ - @GridToStringExclude - private volatile long lastUpdateTime = U.currentTimeMillis(); - - /** Metrics provider (transient). */ - @GridToStringExclude - private DiscoveryMetricsProvider metricsProvider; - - /** Visible flag (transient). */ - @GridToStringExclude - private boolean visible; - - /** Grid local node flag (transient). */ - private boolean loc; - - /** Version. */ - private IgniteProductVersion ver; - - /** Alive check (used by clients). */ - @GridToStringExclude - private transient int aliveCheck; - - /** Client router node ID. */ - @GridToStringExclude - private UUID clientRouterNodeId; - - /** - * Public default no-arg constructor for {@link Externalizable} interface. - */ - public GridTcpDiscoveryNode() { - // No-op. - } - - /** - * Constructor. - * - * @param id Node Id. - * @param addrs Addresses. - * @param hostNames Host names. - * @param discPort Port. - * @param metricsProvider Metrics provider. - * @param ver Version. - */ - public GridTcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort, - DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) { - assert id != null; - assert !F.isEmpty(addrs); - assert metricsProvider != null; - assert ver != null; - - this.id = id; - this.addrs = addrs; - this.hostNames = hostNames; - this.discPort = discPort; - this.metricsProvider = metricsProvider; - this.ver = ver; - - consistentId = U.consistentId(addrs, discPort); - - metrics = metricsProvider.getMetrics(); - sockAddrs = U.toSocketAddresses(this, discPort); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return id; - } - - /** {@inheritDoc} */ - @Override public Object consistentId() { - return consistentId; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <T> T attribute(String name) { - // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) - return null; - - return (T)attrs.get(name); - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> attributes() { - // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - return F.view(attrs, new IgnitePredicate<String>() { - @Override public boolean apply(String s) { - return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); - } - }); - } - - /** - * Sets node attributes. - * - * @param attrs Node attributes. - */ - public void setAttributes(Map<String, Object> attrs) { - this.attrs = U.sealMap(attrs); - } - - /** - * Gets node attributes without filtering. - * - * @return Node attributes without filtering. - */ - public Map<String, Object> getAttributes() { - return attrs; - } - - /** {@inheritDoc} */ - @Override public ClusterNodeMetrics metrics() { - if (metricsProvider != null) - metrics = metricsProvider.getMetrics(); - - return metrics; - } - - /** - * Sets node metrics. - * - * @param metrics Node metrics. - */ - public void setMetrics(ClusterNodeMetrics metrics) { - assert metrics != null; - - this.metrics = metrics; - } - - /** - * @return Internal order. - */ - public long internalOrder() { - return intOrder; - } - - /** - * @param intOrder Internal order of the node. - */ - public void internalOrder(long intOrder) { - assert intOrder > 0; - - this.intOrder = intOrder; - } - - /** - * @return Order. - */ - @Override public long order() { - return order; - } - - /** - * @param order Order of the node. - */ - public void order(long order) { - assert order >= 0 : "Order is invalid: " + this; - - this.order = order; - } - - /** {@inheritDoc} */ - @Override public IgniteProductVersion version() { - return ver; - } - - /** - * @param ver Version. - */ - public void version(IgniteProductVersion ver) { - assert ver != null; - - this.ver = ver; - } - - /** {@inheritDoc} */ - @Override public Collection<String> addresses() { - return addrs; - } - - /** {@inheritDoc} */ - @Override public boolean isLocal() { - return loc; - } - - /** - * @param loc Grid local node flag. - */ - public void local(boolean loc) { - this.loc = loc; - } - - /** {@inheritDoc} */ - @Override public boolean isDaemon() { - return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON)); - } - - /** {@inheritDoc} */ - @Override public Collection<String> hostNames() { - return hostNames; - } - - /** - * @return Discovery port. - */ - public int discoveryPort() { - return discPort; - } - - /** - * @return Addresses that could be used by discovery. - */ - public Collection<InetSocketAddress> socketAddresses() { - return sockAddrs; - } - - /** - * Gets node last update time. - * - * @return Time of the last heartbeat. - */ - public long lastUpdateTime() { - return lastUpdateTime; - } - - /** - * Sets node last update. - * - * @param lastUpdateTime Time of last metrics update. - */ - public void lastUpdateTime(long lastUpdateTime) { - assert lastUpdateTime > 0; - - this.lastUpdateTime = lastUpdateTime; - } - - /** - * Gets visible flag. - * - * @return {@code true} if node is in visible state. - */ - public boolean visible() { - return visible; - } - - /** - * Sets visible flag. - * - * @param visible {@code true} if node is in visible state. - */ - public void visible(boolean visible) { - this.visible = visible; - } - - /** {@inheritDoc} */ - @Override public boolean isClient() { - return clientRouterNodeId != null; - } - - /** - * Decrements alive check value and returns new one. - * - * @return Alive check value. - */ - public int decrementAliveCheck() { - assert isClient(); - - return --aliveCheck; - } - - /** - * @param aliveCheck Alive check value. - */ - public void aliveCheck(int aliveCheck) { - assert isClient(); - - this.aliveCheck = aliveCheck; - } - - /** - * @return Client router node ID. - */ - public UUID clientRouterNodeId() { - return clientRouterNodeId; - } - - /** - * @param clientRouterNodeId Client router node ID. - */ - public void clientRouterNodeId(UUID clientRouterNodeId) { - this.clientRouterNodeId = clientRouterNodeId; - } - - /** {@inheritDoc} */ - @Override public int compareTo(@Nullable GridTcpDiscoveryNode node) { - if (node == null) - return 1; - - if (internalOrder() == node.internalOrder()) - assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']'; - - return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 : - id().compareTo(node.id()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, id); - U.writeMap(out, attrs); - U.writeCollection(out, addrs); - U.writeCollection(out, hostNames); - out.writeInt(discPort); - - byte[] mtr = null; - - if (metrics != null) { - mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE]; - - DiscoveryMetricsHelper.serialize(mtr, 0, metrics); - } - - U.writeByteArray(out, mtr); - - out.writeLong(order); - out.writeLong(intOrder); - out.writeObject(ver); - U.writeUuid(out, clientRouterNodeId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readUuid(in); - - attrs = U.sealMap(U.<String, Object>readMap(in)); - addrs = U.readCollection(in); - hostNames = U.readCollection(in); - discPort = in.readInt(); - - sockAddrs = U.toSocketAddresses(this, discPort); - - consistentId = U.consistentId(addrs, discPort); - - byte[] mtr = U.readByteArray(in); - - if (mtr != null) - metrics = DiscoveryMetricsHelper.deserialize(mtr, 0); - - order = in.readLong(); - intOrder = in.readLong(); - ver = (IgniteProductVersion)in.readObject(); - clientRouterNodeId = U.readUuid(in); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return F.eqNodes(this, o); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTcpDiscoveryNode.class, this, "isClient", isClient()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNodesRing.java deleted file mode 100644 index 25c9192..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryNodesRing.java +++ /dev/null @@ -1,636 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.locks.*; - -/** - * Convenient way to represent topology for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} - */ -public class GridTcpDiscoveryNodesRing { - /** Visible nodes filter. */ - private static final IgnitePredicate<GridTcpDiscoveryNode> VISIBLE_NODES = new P1<GridTcpDiscoveryNode>() { - @Override public boolean apply(GridTcpDiscoveryNode node) { - return node.visible(); - } - }; - - /** Client nodes filter. */ - private static final PN CLIENT_NODES = new PN() { - @Override public boolean apply(ClusterNode node) { - return node.isClient(); - } - }; - - /** Local node. */ - private GridTcpDiscoveryNode locNode; - - /** All nodes in topology. */ - @GridToStringInclude - private NavigableSet<GridTcpDiscoveryNode> nodes = new TreeSet<>(); - - /** All started nodes. */ - @GridToStringExclude - private Map<UUID, GridTcpDiscoveryNode> nodesMap = new HashMap<>(); - - /** Current topology version */ - private long topVer; - - /** */ - private long nodeOrder; - - /** Lock. */ - @GridToStringExclude - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - - /** - * Sets local node. - * - * @param locNode Local node. - */ - public void localNode(GridTcpDiscoveryNode locNode) { - assert locNode != null; - - rwLock.writeLock().lock(); - - try { - this.locNode = locNode; - - clear(); - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Gets all nodes in the topology. - * - * @return Collection of all nodes. - */ - public Collection<GridTcpDiscoveryNode> allNodes() { - return nodes(); - } - - /** - * Gets visible nodes in the topology. - * - * @return Collection of visible nodes. - */ - public Collection<GridTcpDiscoveryNode> visibleNodes() { - return nodes(VISIBLE_NODES); - } - - /** - * Gets remote nodes. - * - * @return Collection of remote nodes in grid. - */ - public Collection<GridTcpDiscoveryNode> remoteNodes() { - return nodes(F.remoteNodes(locNode.id())); - } - - /** - * Gets visible remote nodes in the topology. - * - * @return Collection of visible remote nodes. - */ - public Collection<GridTcpDiscoveryNode> visibleRemoteNodes() { - return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id())); - } - - /** - * @return Client nodes. - */ - public Collection<GridTcpDiscoveryNode> clientNodes() { - return nodes(CLIENT_NODES); - } - - /** - * Checks whether the topology has remote nodes in. - * - * @return {@code true} if the topology has remote nodes in. - */ - public boolean hasRemoteNodes() { - rwLock.readLock().lock(); - - try { - return nodes.size() > 1; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Adds node to topology, also initializes node last update time with current - * system time. - * - * @param node Node to add. - * @return {@code true} if such node was added and did not present previously in the topology. - */ - public boolean add(GridTcpDiscoveryNode node) { - assert node != null; - assert node.internalOrder() > 0; - - rwLock.writeLock().lock(); - - try { - if (nodesMap.containsKey(node.id())) - return false; - - assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " + - "[ring=" + this + ", node=" + node + ']'; - - nodesMap.put(node.id(), node); - - nodes = new TreeSet<>(nodes); - - node.lastUpdateTime(U.currentTimeMillis()); - - nodes.add(node); - - nodeOrder = node.internalOrder(); - } - finally { - rwLock.writeLock().unlock(); - } - - return true; - } - - /** - * @return Max internal order. - */ - public long maxInternalOrder() { - rwLock.readLock().lock(); - - try { - GridTcpDiscoveryNode last = nodes.last(); - - return last != null ? last.internalOrder() : -1; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Restores topology from parameters values. - * <p> - * This method is called when new node receives topology from coordinator. - * In this case all nodes received are remote for local. - * <p> - * Also initializes nodes last update time with current system time. - * - * @param nodes List of remote nodes. - * @param topVer Topology version. - */ - public void restoreTopology(Iterable<GridTcpDiscoveryNode> nodes, long topVer) { - assert !F.isEmpty(nodes); - assert topVer > 0; - - rwLock.writeLock().lock(); - - try { - locNode.internalOrder(topVer); - - clear(); - - boolean firstAdd = true; - - for (GridTcpDiscoveryNode node : nodes) { - if (nodesMap.containsKey(node.id())) - continue; - - nodesMap.put(node.id(), node); - - if (firstAdd) { - this.nodes = new TreeSet<>(this.nodes); - - firstAdd = false; - } - - node.lastUpdateTime(U.currentTimeMillis()); - - this.nodes.add(node); - } - - nodeOrder = topVer; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Finds node by ID. - * - * @param nodeId Node id to find. - * @return Node with ID provided or {@code null} if not found. - */ - @Nullable public GridTcpDiscoveryNode node(UUID nodeId) { - assert nodeId != null; - - rwLock.readLock().lock(); - - try { - return nodesMap.get(nodeId); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Removes node from the topology. - * - * @param nodeId ID of the node to remove. - * @return {@code true} if node was removed. - */ - @Nullable public GridTcpDiscoveryNode removeNode(UUID nodeId) { - assert nodeId != null; - assert !locNode.id().equals(nodeId); - - rwLock.writeLock().lock(); - - try { - GridTcpDiscoveryNode rmv = nodesMap.remove(nodeId); - - if (rmv != null) { - nodes = new TreeSet<>(nodes); - - nodes.remove(rmv); - } - - return rmv; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Removes nodes from the topology. - * - * @param nodeIds IDs of the nodes to remove. - * @return Collection of removed nodes. - */ - public Collection<GridTcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) { - assert !F.isEmpty(nodeIds); - - rwLock.writeLock().lock(); - - try { - boolean firstRmv = true; - - Collection<GridTcpDiscoveryNode> res = null; - - for (UUID id : nodeIds) { - GridTcpDiscoveryNode rmv = nodesMap.remove(id); - - if (rmv != null) { - if (firstRmv) { - nodes = new TreeSet<>(nodes); - - res = new ArrayList<>(nodeIds.size()); - - firstRmv = false; - } - - nodes.remove(rmv); - - res.add(rmv); - } - } - - return res == null ? Collections.<GridTcpDiscoveryNode>emptyList() : res; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Removes all remote nodes, leaves only local node. - * <p> - * This should be called when SPI should be disconnected from topology and - * reconnected back after. - */ - public void clear() { - rwLock.writeLock().lock(); - - try { - nodes = new TreeSet<>(); - - if (locNode != null) - nodes.add(locNode); - - nodesMap = new HashMap<>(); - - if (locNode != null) - nodesMap.put(locNode.id(), locNode); - - nodeOrder = 0; - - topVer = 0; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Finds coordinator in the topology. - * - * @return Coordinator node that gives versions to topology (node with the smallest order). - */ - @Nullable public GridTcpDiscoveryNode coordinator() { - rwLock.readLock().lock(); - - try { - if (F.isEmpty(nodes)) - return null; - - return coordinator(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds coordinator in the topology filtering excluded nodes from the search. - * <p> - * This may be used when handling current coordinator leave or failure. - * - * @param excluded Nodes to exclude from the search (optional). - * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded. - */ - @Nullable public GridTcpDiscoveryNode coordinator(@Nullable Collection<GridTcpDiscoveryNode> excluded) { - rwLock.readLock().lock(); - - try { - Collection<GridTcpDiscoveryNode> filtered = serverNodes(excluded); - - if (F.isEmpty(filtered)) - return null; - - return Collections.min(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds next node in the topology. - * - * @return Next node. - */ - @Nullable public GridTcpDiscoveryNode nextNode() { - rwLock.readLock().lock(); - - try { - if (nodes.size() < 2) - return null; - - return nextNode(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds next node in the topology filtering excluded nodes from search. - * <p> - * This may be used when detecting and handling nodes failure. - * - * @param excluded Nodes to exclude from the search (optional). If provided, - * cannot contain local node. - * @return Next node or {@code null} if all nodes were filtered out or - * topology contains less than two nodes. - */ - @Nullable public GridTcpDiscoveryNode nextNode(@Nullable Collection<GridTcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); - - rwLock.readLock().lock(); - - try { - Collection<GridTcpDiscoveryNode> filtered = serverNodes(excluded); - - if (filtered.size() < 2) - return null; - - Iterator<GridTcpDiscoveryNode> iter = filtered.iterator(); - - while (iter.hasNext()) { - GridTcpDiscoveryNode node = iter.next(); - - if (locNode.equals(node)) - break; - } - - return iter.hasNext() ? iter.next() : F.first(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology. - * - * @return Previous node. - */ - @Nullable public GridTcpDiscoveryNode previousNode() { - rwLock.readLock().lock(); - - try { - if (nodes.size() < 2) - return null; - - return previousNode(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology filtering excluded nodes from search. - * - * @param excluded Nodes to exclude from the search (optional). If provided, - * cannot contain local node. - * @return Previous node or {@code null} if all nodes were filtered out or - * topology contains less than two nodes. - */ - @Nullable public GridTcpDiscoveryNode previousNode(@Nullable Collection<GridTcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); - - rwLock.readLock().lock(); - - try { - Collection<GridTcpDiscoveryNode> filtered = serverNodes(excluded); - - if (filtered.size() < 2) - return null; - - Iterator<GridTcpDiscoveryNode> iter = filtered.iterator(); - - while (iter.hasNext()) { - GridTcpDiscoveryNode node = iter.next(); - - if (locNode.equals(node)) - break; - } - - return iter.hasNext() ? iter.next() : F.first(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Gets current topology version. - * - * @return Current topology version. - */ - public long topologyVersion() { - rwLock.readLock().lock(); - - try { - return topVer; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Sets new topology version. - * - * @param topVer New topology version (should be greater than current, otherwise no-op). - * @return {@code True} if topology has been changed. - */ - public boolean topologyVersion(long topVer) { - rwLock.writeLock().lock(); - - try { - if (this.topVer < topVer) { - this.topVer = topVer; - - return true; - } - - U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']'); - - return false; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Increments topology version and gets new value. - * - * @return Topology version (incremented). - */ - public long incrementTopologyVersion() { - rwLock.writeLock().lock(); - - try { - return ++topVer; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Increments topology version and gets new value. - * - * @return Topology version (incremented). - */ - public long nextNodeOrder() { - rwLock.writeLock().lock(); - - try { - if (nodeOrder == 0) { - GridTcpDiscoveryNode last = nodes.last(); - - assert last != null; - - nodeOrder = last.internalOrder(); - } - - return ++nodeOrder; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * @param p Filters. - * @return Unmodifiable collection of nodes. - */ - private Collection<GridTcpDiscoveryNode> nodes(IgnitePredicate<? super GridTcpDiscoveryNode>... p) { - rwLock.readLock().lock(); - - try { - List<GridTcpDiscoveryNode> list = U.arrayList(nodes, p); - - return Collections.unmodifiableCollection(list); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Gets server nodes from topology. - * - * @param excluded Nodes to exclude from the search (optional). - * @return Collection of server nodes. - */ - private Collection<GridTcpDiscoveryNode> serverNodes(@Nullable final Collection<GridTcpDiscoveryNode> excluded) { - final boolean excludedEmpty = F.isEmpty(excluded); - - return F.view(nodes, new P1<GridTcpDiscoveryNode>() { - @Override public boolean apply(GridTcpDiscoveryNode node) { - return !node.isClient() && (excludedEmpty || !excluded.contains(node)); - } - }); - } - - /** {@inheritDoc} */ - @Override public String toString() { - rwLock.readLock().lock(); - - try { - return S.toString(GridTcpDiscoveryNodesRing.class, this); - } - finally { - rwLock.readLock().unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoverySpiState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoverySpiState.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoverySpiState.java deleted file mode 100644 index 72d4e82..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoverySpiState.java +++ /dev/null @@ -1,45 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -/** - * State of local node {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - */ -public enum GridTcpDiscoverySpiState { - /** */ - DISCONNECTED, - - /** */ - CONNECTING, - - /** */ - CONNECTED, - - /** */ - DISCONNECTING, - - /** */ - STOPPING, - - /** */ - LEFT, - - /** */ - DUPLICATE_ID, - - /** */ - AUTH_FAILED, - - /** */ - CHECK_FAILED, - - /** */ - LOOPBACK_PROBLEM -}