http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java deleted file mode 100644 index 9e6bbf1..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ /dev/null @@ -1,996 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.product.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.*; -import org.gridgain.grid.spi.discovery.tcp.internal.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.spi.discovery.tcp.messages.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.io.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.gridgain.grid.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; - -/** - * Base class for TCP discovery SPIs. - */ -abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { - /** Default port to listen (value is <tt>47500</tt>). */ - public static final int DFLT_PORT = 47500; - - /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */ - public static final long DFLT_SOCK_TIMEOUT = 2000; - - /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_ACK_TIMEOUT = 5000; - - /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_NETWORK_TIMEOUT = 5000; - - /** Default value for thread priority (value is <tt>10</tt>). */ - public static final int DFLT_THREAD_PRI = 10; - - /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */ - public static final long DFLT_HEARTBEAT_FREQ = 2000; - - /** Default size of topology snapshots history. */ - public static final int DFLT_TOP_HISTORY_SIZE = 1000; - - /** Response OK. */ - protected static final int RES_OK = 1; - - /** Response CONTINUE JOIN. */ - protected static final int RES_CONTINUE_JOIN = 100; - - /** Response WAIT. */ - protected static final int RES_WAIT = 200; - - /** Local address. */ - protected String locAddr; - - /** IP finder. */ - protected TcpDiscoveryIpFinder ipFinder; - - /** Socket operations timeout. */ - protected long sockTimeout = DFLT_SOCK_TIMEOUT; - - /** Message acknowledgement timeout. */ - protected long ackTimeout = DFLT_ACK_TIMEOUT; - - /** Network timeout. */ - protected long netTimeout = DFLT_NETWORK_TIMEOUT; - - /** Thread priority for all threads started by SPI. */ - protected int threadPri = DFLT_THREAD_PRI; - - /** Heartbeat messages issuing frequency. */ - protected long hbFreq = DFLT_HEARTBEAT_FREQ; - - /** Size of topology snapshots history. */ - protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - - /** Grid discovery listener. */ - protected volatile DiscoverySpiListener lsnr; - - /** Data exchange. */ - protected DiscoverySpiDataExchange exchange; - - /** Metrics provider. */ - protected DiscoveryMetricsProvider metricsProvider; - - /** Local node attributes. */ - protected Map<String, Object> locNodeAttrs; - - /** Local node version. */ - protected IgniteProductVersion locNodeVer; - - /** Local node. */ - protected TcpDiscoveryNode locNode; - - /** Local host. */ - protected InetAddress locHost; - - /** Internal and external addresses of local node. */ - protected Collection<InetSocketAddress> locNodeAddrs; - - /** Socket timeout worker. */ - protected SocketTimeoutWorker sockTimeoutWorker; - - /** Discovery state. */ - protected TcpDiscoverySpiState spiState = DISCONNECTED; - - /** Start time of the very first grid node. */ - protected volatile long gridStartTime; - - /** Marshaller. */ - protected final IgniteMarshaller marsh = new IgniteJdkMarshaller(); - - /** Statistics. */ - protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); - - /** Local node ID. */ - @IgniteLocalNodeIdResource - protected UUID locNodeId; - - /** Name of the grid. */ - @IgniteNameResource - protected String gridName; - - /** Logger. */ - @IgniteLoggerResource - protected IgniteLogger log; - - /** - * Sets local host IP address that discovery SPI uses. - * <p> - * If not provided, by default a first found non-loopback address - * will be used. If there is no non-loopback address available, - * then {@link InetAddress#getLocalHost()} will be used. - * - * @param locAddr IP address. - */ - @IgniteSpiConfiguration(optional = true) - @IgniteLocalHostResource - public void setLocalAddress(String locAddr) { - // Injection should not override value already set by Spring or user. - if (this.locAddr == null) - this.locAddr = locAddr; - } - - /** - * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. - * - * @return local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** - * Gets IP finder for IP addresses sharing and storing. - * - * @return IP finder for IP addresses sharing and storing. - */ - public TcpDiscoveryIpFinder getIpFinder() { - return ipFinder; - } - - /** - * Sets IP finder for IP addresses sharing and storing. - * <p> - * If not provided {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. - * - * @param ipFinder IP finder. - */ - @IgniteSpiConfiguration(optional = true) - public void setIpFinder(TcpDiscoveryIpFinder ipFinder) { - this.ipFinder = ipFinder; - } - - /** - * Sets socket operations timeout. This timeout is used to limit connection time and - * write-to-socket time. - * <p> - * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value - * significantly greater than the default (e.g. to {@code 30000}). - * <p> - * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. - * - * @param sockTimeout Socket connection timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setSocketTimeout(long sockTimeout) { - this.sockTimeout = sockTimeout; - } - - /** - * Sets timeout for receiving acknowledgement for sent message. - * <p> - * If acknowledgement is not received within this timeout, sending is considered as failed - * and SPI tries to repeat message sending. - * <p> - * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. - * - * @param ackTimeout Acknowledgement timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - } - - /** - * Sets maximum network timeout to use for network operations. - * <p> - * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. - * - * @param netTimeout Network timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setNetworkTimeout(long netTimeout) { - this.netTimeout = netTimeout; - } - - /** - * Sets thread priority. All threads within SPI will be started with it. - * <p> - * If not provided, default value is {@link #DFLT_THREAD_PRI} - * - * @param threadPri Thread priority. - */ - @IgniteSpiConfiguration(optional = true) - public void setThreadPriority(int threadPri) { - this.threadPri = threadPri; - } - - /** - * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages - * in configurable time interval to other nodes to notify them about its state. - * <p> - * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. - * - * @param hbFreq Heartbeat frequency in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setHeartbeatFrequency(long hbFreq) { - this.hbFreq = hbFreq; - } - - /** - * @return Size of topology snapshots history. - */ - public long getTopHistorySize() { - return topHistSize; - } - - /** - * Sets size of topology snapshots history. Specified size should be greater than or equal to default size - * {@link #DFLT_TOP_HISTORY_SIZE}. - * - * @param topHistSize Size of topology snapshots history. - */ - @IgniteSpiConfiguration(optional = true) - public void setTopHistorySize(int topHistSize) { - if (topHistSize < DFLT_TOP_HISTORY_SIZE) { - U.warn(log, "Topology history size should be greater than or equal to default size. " + - "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + - ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - - return; - } - - this.topHistSize = topHistSize; - } - - /** {@inheritDoc} */ - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { - assert locNodeAttrs == null; - assert locNodeVer == null; - - if (log.isDebugEnabled()) { - log.debug("Node attributes to set: " + attrs); - log.debug("Node version to set: " + ver); - } - - locNodeAttrs = attrs; - locNodeVer = ver; - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ipFinder.onSpiContextInitialized(spiCtx); - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - ipFinder.onSpiContextDestroyed(); - } - - /** {@inheritDoc} */ - @Override public ClusterNode getLocalNode() { - return locNode; - } - - /** {@inheritDoc} */ - @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { - this.lsnr = lsnr; - } - - /** {@inheritDoc} */ - @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { - this.exchange = exchange; - } - - /** {@inheritDoc} */ - @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { - this.metricsProvider = metricsProvider; - } - - /** {@inheritDoc} */ - @Override public long getGridStartTime() { - assert gridStartTime != 0; - - return gridStartTime; - } - - /** - * @param sockAddr Remote address. - * @return Opened socket. - * @throws IOException If failed. - */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { - assert sockAddr != null; - - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; - - InetAddress addr = resolved.getAddress(); - - assert addr != null; - - Socket sock = new Socket(); - - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(true); - - sock.connect(resolved, (int)sockTimeout); - - writeToSocket(sock, U.GG_HEADER); - - return sock; - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param data Raw data to write. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { - assert sock != null; - assert data != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - out.write(data); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed or write timed out. - * @throws GridException If marshalling failed. - */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, GridException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @param bout Byte array output stream. - * @throws IOException If IO failed or write timed out. - * @throws GridException If marshalling failed. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, GridException { - assert sock != null; - assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes response to the socket. - * - * @param sock Socket. - * @param res Integer response. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, int res) throws IOException { - assert sock != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - OutputStream out = sock.getOutputStream(); - - IOException err = null; - - try { - out.write(res); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Reads message from the socket limiting read time. - * - * @param sock Socket. - * @param in Input stream (in case socket stream was wrapped). - * @param timeout Socket timeout for this operation. - * @return Message. - * @throws IOException If IO failed or read timed out. - * @throws GridException If unmarshalling failed. - */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); - } - catch (IOException | GridException e) { - if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + - "in long GC pauses on remote node. Current timeout: " + timeout + '.'); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Reads message delivery receipt from the socket. - * - * @param sock Socket. - * @param timeout Socket timeout for this operation. - * @return Receipt. - * @throws IOException If IO failed or read timed out. - */ - protected int readReceipt(Socket sock, long timeout) throws IOException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - int res = sock.getInputStream().read(); - - if (res == -1) - throw new EOFException(); - - return res; - } - catch (SocketTimeoutException e) { - LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + - "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + - "configuration property). Will retry to send message with increased timeout. " + - "Current timeout: " + timeout + '.'); - - stats.onAckTimeout(); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Resolves addresses registered in the IP finder, removes duplicates and local host - * address and returns the collection of. - * - * @return Resolved addresses without duplicates and local address (potentially - * empty but never null). - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { - List<InetSocketAddress> res = new ArrayList<>(); - - Collection<InetSocketAddress> addrs; - - // Get consistent addresses collection. - while (true) { - try { - addrs = registeredAddresses(); - - break; - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to get registered addresses from IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - - for (InetSocketAddress addr : addrs) { - assert addr != null; - - try { - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; - - if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) - res.add(resolved); - } - catch (UnknownHostException ignored) { - LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); - - // Add address in any case. - res.add(addr); - } - } - - if (!res.isEmpty()) - Collections.shuffle(res); - - return res; - } - - /** - * Gets addresses registered in the IP finder, initializes addresses having no - * port (or 0 port) with {@link #DFLT_PORT}. - * - * @return Registered addresses. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { - Collection<InetSocketAddress> res = new LinkedList<>(); - - for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { - if (addr.getPort() == 0) - addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) : - new InetSocketAddress(addr.getAddress(), DFLT_PORT); - - res.add(addr); - } - - return res; - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) { - assert msg != null; - - return new IgniteSpiException("Local node has the same ID as existing node in topology " + - "(fix configuration and restart local node) [localNode=" + locNode + - ", existingNode=" + msg.node() + ']'); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) { - assert msg != null; - - return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" + - msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) { - assert msg != null; - - return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : - new IgniteSpiException(msg.error()); - } - - /** - * @param msg Message. - * @return Whether delivery of the message is ensured. - */ - protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; - } - - /** - * @param msg Failed message. - * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. - * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it - * and create separate message for failed version check with next major release. - */ - @Deprecated - private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) { - return msg.error().contains("versions are not compatible"); - } - - /** - * Handles sockets timeouts. - */ - protected class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { - @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-disco-sock-timeout-worker", log); - - setPriority(threadPri); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } - - /** - * @param timeoutObj Timeout object to remove. - */ - public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); - - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - SocketTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); - - if (timeoutObj.onTimeout()) { - LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - - stats.onSocketTimeout(); - } - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - SocketTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } - } - } - } - - /** - * Socket timeout object. - */ - protected static class SocketTimeoutObject { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - - /** */ - private final long id = idGen.incrementAndGet(); - - /** */ - private final Socket sock; - - /** */ - private final long endTime; - - /** */ - private final AtomicBoolean done = new AtomicBoolean(); - - /** - * @param sock Socket. - * @param endTime End time. - */ - SocketTimeoutObject(Socket sock, long endTime) { - assert sock != null; - assert endTime > 0; - - this.sock = sock; - this.endTime = endTime; - } - - /** - * @return {@code True} if object has not yet been processed. - */ - boolean cancel() { - return done.compareAndSet(false, true); - } - - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { - if (done.compareAndSet(false, true)) { - // Close socket - timeout occurred. - U.closeQuiet(sock); - - return true; - } - - return false; - } - - /** - * @return End time. - */ - long endTime() { - return endTime; - } - - /** - * @return ID. - */ - long id() { - return id; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SocketTimeoutObject.class, this); - } - } - - /** - * Base class for message workers. - */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - - /** Message queue. */ - private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); - - /** Backed interrupted flag. */ - private volatile boolean interrupted; - - /** - * @param name Thread name. - */ - protected MessageWorkerAdapter(String name) { - super(gridName, name, log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + locNodeId + ']'); - - while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); - - if (msg == null) - continue; - - processMessage(msg); - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - interrupted = true; - - super.interrupt(); - } - - /** {@inheritDoc} */ - @Override public boolean isInterrupted() { - return interrupted || super.isInterrupted(); - } - - /** - * @return Current queue size. - */ - int queueSize() { - return queue.size(); - } - - /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - if (msg instanceof TcpDiscoveryHeartbeatMessage) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); - - /** - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed. - * @throws GridException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) - throws IOException, GridException { - bout.reset(); - - TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java deleted file mode 100644 index 6e6a33f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpiMBean.java +++ /dev/null @@ -1,267 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Management bean for {@link TcpDiscoverySpi}. - */ -public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { - /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - @IgniteMBeanDescription("Heartbeat frequency.") - public long getHeartbeatFrequency(); - - /** - * Gets current SPI state. - * - * @return Current SPI state. - */ - @IgniteMBeanDescription("SPI state.") - public String getSpiState(); - - /** - * Gets {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). - * - * @return IPFinder (string representation). - */ - @IgniteMBeanDescription("IP Finder.") - public String getIpFinderFormatted(); - - /** - * Gets number of connection attempts. - * - * @return Number of connection attempts. - */ - @IgniteMBeanDescription("Reconnect count.") - public int getReconnectCount(); - - /** - * Gets network timeout. - * - * @return Network timeout. - */ - @IgniteMBeanDescription("Network timeout.") - public long getNetworkTimeout(); - - /** - * Gets local TCP port SPI listens to. - * - * @return Local port range. - */ - @IgniteMBeanDescription("Local TCP port.") - public int getLocalPort(); - - /** - * Gets local TCP port range. - * - * @return Local port range. - */ - @IgniteMBeanDescription("Local TCP port range.") - public int getLocalPortRange(); - - /** - * Gets max heartbeats count node can miss without initiating status check. - * - * @return Max missed heartbeats. - */ - @IgniteMBeanDescription("Max missed heartbeats.") - public int getMaxMissedHeartbeats(); - - /** - * Gets max heartbeats count node can miss without failing client node. - * - * @return Max missed client heartbeats. - */ - @IgniteMBeanDescription("Max missed client heartbeats.") - public int getMaxMissedClientHeartbeats(); - - /** - * Gets thread priority. All threads within SPI will be started with it. - * - * @return Thread priority. - */ - @IgniteMBeanDescription("Threads priority.") - public int getThreadPriority(); - - /** - * Gets IP finder clean frequency. - * - * @return IP finder clean frequency. - */ - @IgniteMBeanDescription("IP finder clean frequency.") - public long getIpFinderCleanFrequency(); - - /** - * Gets statistics print frequency. - * - * @return Statistics print frequency in milliseconds. - */ - @IgniteMBeanDescription("Statistics print frequency.") - public long getStatisticsPrintFrequency(); - - /** - * Gets message worker queue current size. - * - * @return Message worker queue current size. - */ - @IgniteMBeanDescription("Message worker queue current size.") - public int getMessageWorkerQueueSize(); - - /** - * Gets joined nodes count. - * - * @return Nodes joined count. - */ - @IgniteMBeanDescription("Nodes joined count.") - public long getNodesJoined(); - - /** - * Gets left nodes count. - * - * @return Left nodes count. - */ - @IgniteMBeanDescription("Nodes left count.") - public long getNodesLeft(); - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - @IgniteMBeanDescription("Nodes failed count.") - public long getNodesFailed(); - - /** - * Gets pending messages registered count. - * - * @return Pending messages registered count. - */ - @IgniteMBeanDescription("Pending messages registered.") - public long getPendingMessagesRegistered(); - - /** - * Gets pending messages discarded count. - * - * @return Pending messages registered count. - */ - @IgniteMBeanDescription("Pending messages discarded.") - public long getPendingMessagesDiscarded(); - - /** - * Gets avg message processing time. - * - * @return Avg message processing time. - */ - @IgniteMBeanDescription("Avg message processing time.") - public long getAvgMessageProcessingTime(); - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - @IgniteMBeanDescription("Max message processing time.") - public long getMaxMessageProcessingTime(); - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - @IgniteMBeanDescription("Total received messages count.") - public int getTotalReceivedMessages(); - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @IgniteMBeanDescription("Received messages by type.") - public Map<String, Integer> getReceivedMessages(); - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - @IgniteMBeanDescription("Total processed messages count.") - public int getTotalProcessedMessages(); - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @IgniteMBeanDescription("Received messages by type.") - public Map<String, Integer> getProcessedMessages(); - - /** - * Gets time local node has been coordinator since. - * - * @return Time local node is coordinator since. - */ - @IgniteMBeanDescription("Local node is coordinator since.") - public long getCoordinatorSinceTimestamp(); - - /** - * Gets current coordinator. - * - * @return Gets current coordinator. - */ - @IgniteMBeanDescription("Coordinator node ID.") - @Nullable public UUID getCoordinator(); - - /** - * Gets message acknowledgement timeout. - * - * @return Message acknowledgement timeout. - */ - @IgniteMBeanDescription("Message acknowledgement timeout.") - public long getAckTimeout(); - - /** - * Gets maximum message acknowledgement timeout. - * - * @return Maximum message acknowledgement timeout. - */ - @IgniteMBeanDescription("Maximum message acknowledgement timeout.") - public long getMaxAckTimeout(); - - /** - * Gets socket timeout. - * - * @return Socket timeout. - */ - @IgniteMBeanDescription("Socket timeout.") - public long getSocketTimeout(); - - /** - * Gets join timeout. - * - * @return Join timeout. - */ - @IgniteMBeanDescription("Join timeout.") - public long getJoinTimeout(); - - /** - * Dumps debug info using configured logger. - */ - @IgniteMBeanDescription("Dump debug info.") - public void dumpDebugInfo(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java deleted file mode 100644 index fe2d419..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ /dev/null @@ -1,443 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.product.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.spi.discovery.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.gridgain.grid.kernal.GridNodeAttributes.*; - -/** - * Node for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - * <p> - * <strong>This class is not intended for public use</strong> and has been made - * <tt>public</tt> due to certain limitations of Java technology. - */ -public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode, - Comparable<TcpDiscoveryNode>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private UUID id; - - /** Consistent ID. */ - private Object consistentId; - - /** Node attributes. */ - @GridToStringExclude - private Map<String, Object> attrs; - - /** Internal discovery addresses as strings. */ - @GridToStringInclude - private Collection<String> addrs; - - /** Internal discovery host names as strings. */ - private Collection<String> hostNames; - - /** */ - @GridToStringInclude - private Collection<InetSocketAddress> sockAddrs; - - /** */ - @GridToStringInclude - private int discPort; - - /** Node metrics. */ - @GridToStringExclude - private volatile ClusterNodeMetrics metrics; - - /** Node order in the topology. */ - private volatile long order; - - /** Node order in the topology (internal). */ - private volatile long intOrder; - - /** The most recent time when heartbeat message was received from the node. */ - @GridToStringExclude - private volatile long lastUpdateTime = U.currentTimeMillis(); - - /** Metrics provider (transient). */ - @GridToStringExclude - private DiscoveryMetricsProvider metricsProvider; - - /** Visible flag (transient). */ - @GridToStringExclude - private boolean visible; - - /** Grid local node flag (transient). */ - private boolean loc; - - /** Version. */ - private IgniteProductVersion ver; - - /** Alive check (used by clients). */ - @GridToStringExclude - private transient int aliveCheck; - - /** Client router node ID. */ - @GridToStringExclude - private UUID clientRouterNodeId; - - /** - * Public default no-arg constructor for {@link Externalizable} interface. - */ - public TcpDiscoveryNode() { - // No-op. - } - - /** - * Constructor. - * - * @param id Node Id. - * @param addrs Addresses. - * @param hostNames Host names. - * @param discPort Port. - * @param metricsProvider Metrics provider. - * @param ver Version. - */ - public TcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort, - DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) { - assert id != null; - assert !F.isEmpty(addrs); - assert metricsProvider != null; - assert ver != null; - - this.id = id; - this.addrs = addrs; - this.hostNames = hostNames; - this.discPort = discPort; - this.metricsProvider = metricsProvider; - this.ver = ver; - - consistentId = U.consistentId(addrs, discPort); - - metrics = metricsProvider.getMetrics(); - sockAddrs = U.toSocketAddresses(this, discPort); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return id; - } - - /** {@inheritDoc} */ - @Override public Object consistentId() { - return consistentId; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <T> T attribute(String name) { - // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) - return null; - - return (T)attrs.get(name); - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> attributes() { - // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - return F.view(attrs, new IgnitePredicate<String>() { - @Override public boolean apply(String s) { - return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); - } - }); - } - - /** - * Sets node attributes. - * - * @param attrs Node attributes. - */ - public void setAttributes(Map<String, Object> attrs) { - this.attrs = U.sealMap(attrs); - } - - /** - * Gets node attributes without filtering. - * - * @return Node attributes without filtering. - */ - public Map<String, Object> getAttributes() { - return attrs; - } - - /** {@inheritDoc} */ - @Override public ClusterNodeMetrics metrics() { - if (metricsProvider != null) - metrics = metricsProvider.getMetrics(); - - return metrics; - } - - /** - * Sets node metrics. - * - * @param metrics Node metrics. - */ - public void setMetrics(ClusterNodeMetrics metrics) { - assert metrics != null; - - this.metrics = metrics; - } - - /** - * @return Internal order. - */ - public long internalOrder() { - return intOrder; - } - - /** - * @param intOrder Internal order of the node. - */ - public void internalOrder(long intOrder) { - assert intOrder > 0; - - this.intOrder = intOrder; - } - - /** - * @return Order. - */ - @Override public long order() { - return order; - } - - /** - * @param order Order of the node. - */ - public void order(long order) { - assert order >= 0 : "Order is invalid: " + this; - - this.order = order; - } - - /** {@inheritDoc} */ - @Override public IgniteProductVersion version() { - return ver; - } - - /** - * @param ver Version. - */ - public void version(IgniteProductVersion ver) { - assert ver != null; - - this.ver = ver; - } - - /** {@inheritDoc} */ - @Override public Collection<String> addresses() { - return addrs; - } - - /** {@inheritDoc} */ - @Override public boolean isLocal() { - return loc; - } - - /** - * @param loc Grid local node flag. - */ - public void local(boolean loc) { - this.loc = loc; - } - - /** {@inheritDoc} */ - @Override public boolean isDaemon() { - return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON)); - } - - /** {@inheritDoc} */ - @Override public Collection<String> hostNames() { - return hostNames; - } - - /** - * @return Discovery port. - */ - public int discoveryPort() { - return discPort; - } - - /** - * @return Addresses that could be used by discovery. - */ - public Collection<InetSocketAddress> socketAddresses() { - return sockAddrs; - } - - /** - * Gets node last update time. - * - * @return Time of the last heartbeat. - */ - public long lastUpdateTime() { - return lastUpdateTime; - } - - /** - * Sets node last update. - * - * @param lastUpdateTime Time of last metrics update. - */ - public void lastUpdateTime(long lastUpdateTime) { - assert lastUpdateTime > 0; - - this.lastUpdateTime = lastUpdateTime; - } - - /** - * Gets visible flag. - * - * @return {@code true} if node is in visible state. - */ - public boolean visible() { - return visible; - } - - /** - * Sets visible flag. - * - * @param visible {@code true} if node is in visible state. - */ - public void visible(boolean visible) { - this.visible = visible; - } - - /** {@inheritDoc} */ - @Override public boolean isClient() { - return clientRouterNodeId != null; - } - - /** - * Decrements alive check value and returns new one. - * - * @return Alive check value. - */ - public int decrementAliveCheck() { - assert isClient(); - - return --aliveCheck; - } - - /** - * @param aliveCheck Alive check value. - */ - public void aliveCheck(int aliveCheck) { - assert isClient(); - - this.aliveCheck = aliveCheck; - } - - /** - * @return Client router node ID. - */ - public UUID clientRouterNodeId() { - return clientRouterNodeId; - } - - /** - * @param clientRouterNodeId Client router node ID. - */ - public void clientRouterNodeId(UUID clientRouterNodeId) { - this.clientRouterNodeId = clientRouterNodeId; - } - - /** {@inheritDoc} */ - @Override public int compareTo(@Nullable TcpDiscoveryNode node) { - if (node == null) - return 1; - - if (internalOrder() == node.internalOrder()) - assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']'; - - return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 : - id().compareTo(node.id()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, id); - U.writeMap(out, attrs); - U.writeCollection(out, addrs); - U.writeCollection(out, hostNames); - out.writeInt(discPort); - - byte[] mtr = null; - - if (metrics != null) { - mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE]; - - DiscoveryMetricsHelper.serialize(mtr, 0, metrics); - } - - U.writeByteArray(out, mtr); - - out.writeLong(order); - out.writeLong(intOrder); - out.writeObject(ver); - U.writeUuid(out, clientRouterNodeId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readUuid(in); - - attrs = U.sealMap(U.<String, Object>readMap(in)); - addrs = U.readCollection(in); - hostNames = U.readCollection(in); - discPort = in.readInt(); - - sockAddrs = U.toSocketAddresses(this, discPort); - - consistentId = U.consistentId(addrs, discPort); - - byte[] mtr = U.readByteArray(in); - - if (mtr != null) - metrics = DiscoveryMetricsHelper.deserialize(mtr, 0); - - order = in.readLong(); - intOrder = in.readLong(); - ver = (IgniteProductVersion)in.readObject(); - clientRouterNodeId = U.readUuid(in); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return F.eqNodes(this, o); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java deleted file mode 100644 index b9093c1..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ /dev/null @@ -1,636 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.locks.*; - -/** - * Convenient way to represent topology for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} - */ -public class TcpDiscoveryNodesRing { - /** Visible nodes filter. */ - private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { - @Override public boolean apply(TcpDiscoveryNode node) { - return node.visible(); - } - }; - - /** Client nodes filter. */ - private static final PN CLIENT_NODES = new PN() { - @Override public boolean apply(ClusterNode node) { - return node.isClient(); - } - }; - - /** Local node. */ - private TcpDiscoveryNode locNode; - - /** All nodes in topology. */ - @GridToStringInclude - private NavigableSet<TcpDiscoveryNode> nodes = new TreeSet<>(); - - /** All started nodes. */ - @GridToStringExclude - private Map<UUID, TcpDiscoveryNode> nodesMap = new HashMap<>(); - - /** Current topology version */ - private long topVer; - - /** */ - private long nodeOrder; - - /** Lock. */ - @GridToStringExclude - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - - /** - * Sets local node. - * - * @param locNode Local node. - */ - public void localNode(TcpDiscoveryNode locNode) { - assert locNode != null; - - rwLock.writeLock().lock(); - - try { - this.locNode = locNode; - - clear(); - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Gets all nodes in the topology. - * - * @return Collection of all nodes. - */ - public Collection<TcpDiscoveryNode> allNodes() { - return nodes(); - } - - /** - * Gets visible nodes in the topology. - * - * @return Collection of visible nodes. - */ - public Collection<TcpDiscoveryNode> visibleNodes() { - return nodes(VISIBLE_NODES); - } - - /** - * Gets remote nodes. - * - * @return Collection of remote nodes in grid. - */ - public Collection<TcpDiscoveryNode> remoteNodes() { - return nodes(F.remoteNodes(locNode.id())); - } - - /** - * Gets visible remote nodes in the topology. - * - * @return Collection of visible remote nodes. - */ - public Collection<TcpDiscoveryNode> visibleRemoteNodes() { - return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id())); - } - - /** - * @return Client nodes. - */ - public Collection<TcpDiscoveryNode> clientNodes() { - return nodes(CLIENT_NODES); - } - - /** - * Checks whether the topology has remote nodes in. - * - * @return {@code true} if the topology has remote nodes in. - */ - public boolean hasRemoteNodes() { - rwLock.readLock().lock(); - - try { - return nodes.size() > 1; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Adds node to topology, also initializes node last update time with current - * system time. - * - * @param node Node to add. - * @return {@code true} if such node was added and did not present previously in the topology. - */ - public boolean add(TcpDiscoveryNode node) { - assert node != null; - assert node.internalOrder() > 0; - - rwLock.writeLock().lock(); - - try { - if (nodesMap.containsKey(node.id())) - return false; - - assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " + - "[ring=" + this + ", node=" + node + ']'; - - nodesMap.put(node.id(), node); - - nodes = new TreeSet<>(nodes); - - node.lastUpdateTime(U.currentTimeMillis()); - - nodes.add(node); - - nodeOrder = node.internalOrder(); - } - finally { - rwLock.writeLock().unlock(); - } - - return true; - } - - /** - * @return Max internal order. - */ - public long maxInternalOrder() { - rwLock.readLock().lock(); - - try { - TcpDiscoveryNode last = nodes.last(); - - return last != null ? last.internalOrder() : -1; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Restores topology from parameters values. - * <p> - * This method is called when new node receives topology from coordinator. - * In this case all nodes received are remote for local. - * <p> - * Also initializes nodes last update time with current system time. - * - * @param nodes List of remote nodes. - * @param topVer Topology version. - */ - public void restoreTopology(Iterable<TcpDiscoveryNode> nodes, long topVer) { - assert !F.isEmpty(nodes); - assert topVer > 0; - - rwLock.writeLock().lock(); - - try { - locNode.internalOrder(topVer); - - clear(); - - boolean firstAdd = true; - - for (TcpDiscoveryNode node : nodes) { - if (nodesMap.containsKey(node.id())) - continue; - - nodesMap.put(node.id(), node); - - if (firstAdd) { - this.nodes = new TreeSet<>(this.nodes); - - firstAdd = false; - } - - node.lastUpdateTime(U.currentTimeMillis()); - - this.nodes.add(node); - } - - nodeOrder = topVer; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Finds node by ID. - * - * @param nodeId Node id to find. - * @return Node with ID provided or {@code null} if not found. - */ - @Nullable public TcpDiscoveryNode node(UUID nodeId) { - assert nodeId != null; - - rwLock.readLock().lock(); - - try { - return nodesMap.get(nodeId); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Removes node from the topology. - * - * @param nodeId ID of the node to remove. - * @return {@code true} if node was removed. - */ - @Nullable public TcpDiscoveryNode removeNode(UUID nodeId) { - assert nodeId != null; - assert !locNode.id().equals(nodeId); - - rwLock.writeLock().lock(); - - try { - TcpDiscoveryNode rmv = nodesMap.remove(nodeId); - - if (rmv != null) { - nodes = new TreeSet<>(nodes); - - nodes.remove(rmv); - } - - return rmv; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Removes nodes from the topology. - * - * @param nodeIds IDs of the nodes to remove. - * @return Collection of removed nodes. - */ - public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) { - assert !F.isEmpty(nodeIds); - - rwLock.writeLock().lock(); - - try { - boolean firstRmv = true; - - Collection<TcpDiscoveryNode> res = null; - - for (UUID id : nodeIds) { - TcpDiscoveryNode rmv = nodesMap.remove(id); - - if (rmv != null) { - if (firstRmv) { - nodes = new TreeSet<>(nodes); - - res = new ArrayList<>(nodeIds.size()); - - firstRmv = false; - } - - nodes.remove(rmv); - - res.add(rmv); - } - } - - return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Removes all remote nodes, leaves only local node. - * <p> - * This should be called when SPI should be disconnected from topology and - * reconnected back after. - */ - public void clear() { - rwLock.writeLock().lock(); - - try { - nodes = new TreeSet<>(); - - if (locNode != null) - nodes.add(locNode); - - nodesMap = new HashMap<>(); - - if (locNode != null) - nodesMap.put(locNode.id(), locNode); - - nodeOrder = 0; - - topVer = 0; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Finds coordinator in the topology. - * - * @return Coordinator node that gives versions to topology (node with the smallest order). - */ - @Nullable public TcpDiscoveryNode coordinator() { - rwLock.readLock().lock(); - - try { - if (F.isEmpty(nodes)) - return null; - - return coordinator(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds coordinator in the topology filtering excluded nodes from the search. - * <p> - * This may be used when handling current coordinator leave or failure. - * - * @param excluded Nodes to exclude from the search (optional). - * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded. - */ - @Nullable public TcpDiscoveryNode coordinator(@Nullable Collection<TcpDiscoveryNode> excluded) { - rwLock.readLock().lock(); - - try { - Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); - - if (F.isEmpty(filtered)) - return null; - - return Collections.min(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds next node in the topology. - * - * @return Next node. - */ - @Nullable public TcpDiscoveryNode nextNode() { - rwLock.readLock().lock(); - - try { - if (nodes.size() < 2) - return null; - - return nextNode(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds next node in the topology filtering excluded nodes from search. - * <p> - * This may be used when detecting and handling nodes failure. - * - * @param excluded Nodes to exclude from the search (optional). If provided, - * cannot contain local node. - * @return Next node or {@code null} if all nodes were filtered out or - * topology contains less than two nodes. - */ - @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); - - rwLock.readLock().lock(); - - try { - Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); - - if (filtered.size() < 2) - return null; - - Iterator<TcpDiscoveryNode> iter = filtered.iterator(); - - while (iter.hasNext()) { - TcpDiscoveryNode node = iter.next(); - - if (locNode.equals(node)) - break; - } - - return iter.hasNext() ? iter.next() : F.first(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology. - * - * @return Previous node. - */ - @Nullable public TcpDiscoveryNode previousNode() { - rwLock.readLock().lock(); - - try { - if (nodes.size() < 2) - return null; - - return previousNode(null); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Finds previous node in the topology filtering excluded nodes from search. - * - * @param excluded Nodes to exclude from the search (optional). If provided, - * cannot contain local node. - * @return Previous node or {@code null} if all nodes were filtered out or - * topology contains less than two nodes. - */ - @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); - - rwLock.readLock().lock(); - - try { - Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); - - if (filtered.size() < 2) - return null; - - Iterator<TcpDiscoveryNode> iter = filtered.iterator(); - - while (iter.hasNext()) { - TcpDiscoveryNode node = iter.next(); - - if (locNode.equals(node)) - break; - } - - return iter.hasNext() ? iter.next() : F.first(filtered); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Gets current topology version. - * - * @return Current topology version. - */ - public long topologyVersion() { - rwLock.readLock().lock(); - - try { - return topVer; - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Sets new topology version. - * - * @param topVer New topology version (should be greater than current, otherwise no-op). - * @return {@code True} if topology has been changed. - */ - public boolean topologyVersion(long topVer) { - rwLock.writeLock().lock(); - - try { - if (this.topVer < topVer) { - this.topVer = topVer; - - return true; - } - - U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']'); - - return false; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Increments topology version and gets new value. - * - * @return Topology version (incremented). - */ - public long incrementTopologyVersion() { - rwLock.writeLock().lock(); - - try { - return ++topVer; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * Increments topology version and gets new value. - * - * @return Topology version (incremented). - */ - public long nextNodeOrder() { - rwLock.writeLock().lock(); - - try { - if (nodeOrder == 0) { - TcpDiscoveryNode last = nodes.last(); - - assert last != null; - - nodeOrder = last.internalOrder(); - } - - return ++nodeOrder; - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** - * @param p Filters. - * @return Unmodifiable collection of nodes. - */ - private Collection<TcpDiscoveryNode> nodes(IgnitePredicate<? super TcpDiscoveryNode>... p) { - rwLock.readLock().lock(); - - try { - List<TcpDiscoveryNode> list = U.arrayList(nodes, p); - - return Collections.unmodifiableCollection(list); - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Gets server nodes from topology. - * - * @param excluded Nodes to exclude from the search (optional). - * @return Collection of server nodes. - */ - private Collection<TcpDiscoveryNode> serverNodes(@Nullable final Collection<TcpDiscoveryNode> excluded) { - final boolean excludedEmpty = F.isEmpty(excluded); - - return F.view(nodes, new P1<TcpDiscoveryNode>() { - @Override public boolean apply(TcpDiscoveryNode node) { - return !node.isClient() && (excludedEmpty || !excluded.contains(node)); - } - }); - } - - /** {@inheritDoc} */ - @Override public String toString() { - rwLock.readLock().lock(); - - try { - return S.toString(TcpDiscoveryNodesRing.class, this); - } - finally { - rwLock.readLock().unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java deleted file mode 100644 index 7ed3e0a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java +++ /dev/null @@ -1,45 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -/** - * State of local node {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - */ -public enum TcpDiscoverySpiState { - /** */ - DISCONNECTED, - - /** */ - CONNECTING, - - /** */ - CONNECTED, - - /** */ - DISCONNECTING, - - /** */ - STOPPING, - - /** */ - LEFT, - - /** */ - DUPLICATE_ID, - - /** */ - AUTH_FAILED, - - /** */ - CHECK_FAILED, - - /** */ - LOOPBACK_PROBLEM -}