# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/191aae27 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/191aae27 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/191aae27 Branch: refs/heads/master Commit: 191aae279d9af268a951465ff89188837718d4ce Parents: 21eed42 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 16:42:41 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 16:42:41 2014 +0300 ---------------------------------------------------------------------- .../tcp/GridTcpDiscoverySpiAdapter.java | 997 ------------------- .../discovery/tcp/GridTcpDiscoverySpiMBean.java | 268 ----- .../discovery/tcp/TcpClientDiscoverySpi.java | 30 +- .../grid/spi/discovery/tcp/TcpDiscoverySpi.java | 132 +-- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 997 +++++++++++++++++++ .../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 268 +++++ .../tcp/internal/GridTcpDiscoveryNode.java | 443 -------- .../tcp/internal/GridTcpDiscoveryNodesRing.java | 636 ------------ .../tcp/internal/GridTcpDiscoverySpiState.java | 45 - .../internal/GridTcpDiscoveryStatistics.java | 639 ------------ .../tcp/internal/TcpDiscoveryNode.java | 443 ++++++++ .../tcp/internal/TcpDiscoveryNodesRing.java | 636 ++++++++++++ .../tcp/internal/TcpDiscoverySpiState.java | 45 + .../tcp/internal/TcpDiscoveryStatistics.java | 639 ++++++++++++ .../GridTcpDiscoveryDuplicateIdMessage.java | 8 +- .../GridTcpDiscoveryJoinRequestMessage.java | 8 +- .../GridTcpDiscoveryNodeAddedMessage.java | 14 +- .../GridTcpDiscoveryStatusCheckMessage.java | 8 +- ...unctionExcludeNeighborsAbstractSelfTest.java | 4 +- .../grid/spi/GridTcpSpiForwardingSelfTest.java | 2 +- .../discovery/tcp/GridTcpDiscoverySelfTest.java | 4 +- 21 files changed, 3133 insertions(+), 3133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiAdapter.java deleted file mode 100644 index 61829fc..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiAdapter.java +++ /dev/null @@ -1,997 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.product.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.discovery.*; -import org.gridgain.grid.spi.discovery.tcp.internal.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*; -import org.gridgain.grid.spi.discovery.tcp.messages.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.io.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.gridgain.grid.spi.discovery.tcp.internal.GridTcpDiscoverySpiState.*; - -/** - * Base class for TCP discovery SPIs. - */ -abstract class GridTcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { - /** Default port to listen (value is <tt>47500</tt>). */ - public static final int DFLT_PORT = 47500; - - /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */ - public static final long DFLT_SOCK_TIMEOUT = 2000; - - /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_ACK_TIMEOUT = 5000; - - /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_NETWORK_TIMEOUT = 5000; - - /** Default value for thread priority (value is <tt>10</tt>). */ - public static final int DFLT_THREAD_PRI = 10; - - /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */ - public static final long DFLT_HEARTBEAT_FREQ = 2000; - - /** Default size of topology snapshots history. */ - public static final int DFLT_TOP_HISTORY_SIZE = 1000; - - /** Response OK. */ - protected static final int RES_OK = 1; - - /** Response CONTINUE JOIN. */ - protected static final int RES_CONTINUE_JOIN = 100; - - /** Response WAIT. */ - protected static final int RES_WAIT = 200; - - /** Local address. */ - protected String locAddr; - - /** IP finder. */ - protected GridTcpDiscoveryIpFinder ipFinder; - - /** Socket operations timeout. */ - protected long sockTimeout = DFLT_SOCK_TIMEOUT; - - /** Message acknowledgement timeout. */ - protected long ackTimeout = DFLT_ACK_TIMEOUT; - - /** Network timeout. */ - protected long netTimeout = DFLT_NETWORK_TIMEOUT; - - /** Thread priority for all threads started by SPI. */ - protected int threadPri = DFLT_THREAD_PRI; - - /** Heartbeat messages issuing frequency. */ - protected long hbFreq = DFLT_HEARTBEAT_FREQ; - - /** Size of topology snapshots history. */ - protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - - /** Grid discovery listener. */ - protected volatile DiscoverySpiListener lsnr; - - /** Data exchange. */ - protected DiscoverySpiDataExchange exchange; - - /** Metrics provider. */ - protected DiscoveryMetricsProvider metricsProvider; - - /** Local node attributes. */ - protected Map<String, Object> locNodeAttrs; - - /** Local node version. */ - protected IgniteProductVersion locNodeVer; - - /** Local node. */ - protected GridTcpDiscoveryNode locNode; - - /** Local host. */ - protected InetAddress locHost; - - /** Internal and external addresses of local node. */ - protected Collection<InetSocketAddress> locNodeAddrs; - - /** Socket timeout worker. */ - protected SocketTimeoutWorker sockTimeoutWorker; - - /** Discovery state. */ - protected GridTcpDiscoverySpiState spiState = DISCONNECTED; - - /** Start time of the very first grid node. */ - protected volatile long gridStartTime; - - /** Marshaller. */ - protected final IgniteMarshaller marsh = new IgniteJdkMarshaller(); - - /** Statistics. */ - protected final GridTcpDiscoveryStatistics stats = new GridTcpDiscoveryStatistics(); - - /** Local node ID. */ - @IgniteLocalNodeIdResource - protected UUID locNodeId; - - /** Name of the grid. */ - @IgniteNameResource - protected String gridName; - - /** Logger. */ - @IgniteLoggerResource - protected IgniteLogger log; - - /** - * Sets local host IP address that discovery SPI uses. - * <p> - * If not provided, by default a first found non-loopback address - * will be used. If there is no non-loopback address available, - * then {@link InetAddress#getLocalHost()} will be used. - * - * @param locAddr IP address. - */ - @IgniteSpiConfiguration(optional = true) - @IgniteLocalHostResource - public void setLocalAddress(String locAddr) { - // Injection should not override value already set by Spring or user. - if (this.locAddr == null) - this.locAddr = locAddr; - } - - /** - * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. - * - * @return local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** - * Gets IP finder for IP addresses sharing and storing. - * - * @return IP finder for IP addresses sharing and storing. - */ - public GridTcpDiscoveryIpFinder getIpFinder() { - return ipFinder; - } - - /** - * Sets IP finder for IP addresses sharing and storing. - * <p> - * If not provided {@link GridTcpDiscoveryMulticastIpFinder} will be used by default. - * - * @param ipFinder IP finder. - */ - @IgniteSpiConfiguration(optional = true) - public void setIpFinder(GridTcpDiscoveryIpFinder ipFinder) { - this.ipFinder = ipFinder; - } - - /** - * Sets socket operations timeout. This timeout is used to limit connection time and - * write-to-socket time. - * <p> - * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value - * significantly greater than the default (e.g. to {@code 30000}). - * <p> - * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. - * - * @param sockTimeout Socket connection timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setSocketTimeout(long sockTimeout) { - this.sockTimeout = sockTimeout; - } - - /** - * Sets timeout for receiving acknowledgement for sent message. - * <p> - * If acknowledgement is not received within this timeout, sending is considered as failed - * and SPI tries to repeat message sending. - * <p> - * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. - * - * @param ackTimeout Acknowledgement timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - } - - /** - * Sets maximum network timeout to use for network operations. - * <p> - * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. - * - * @param netTimeout Network timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setNetworkTimeout(long netTimeout) { - this.netTimeout = netTimeout; - } - - /** - * Sets thread priority. All threads within SPI will be started with it. - * <p> - * If not provided, default value is {@link #DFLT_THREAD_PRI} - * - * @param threadPri Thread priority. - */ - @IgniteSpiConfiguration(optional = true) - public void setThreadPriority(int threadPri) { - this.threadPri = threadPri; - } - - /** - * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages - * in configurable time interval to other nodes to notify them about its state. - * <p> - * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. - * - * @param hbFreq Heartbeat frequency in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setHeartbeatFrequency(long hbFreq) { - this.hbFreq = hbFreq; - } - - /** - * @return Size of topology snapshots history. - */ - public long getTopHistorySize() { - return topHistSize; - } - - /** - * Sets size of topology snapshots history. Specified size should be greater than or equal to default size - * {@link #DFLT_TOP_HISTORY_SIZE}. - * - * @param topHistSize Size of topology snapshots history. - */ - @IgniteSpiConfiguration(optional = true) - public void setTopHistorySize(int topHistSize) { - if (topHistSize < DFLT_TOP_HISTORY_SIZE) { - U.warn(log, "Topology history size should be greater than or equal to default size. " + - "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + - ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - - return; - } - - this.topHistSize = topHistSize; - } - - /** {@inheritDoc} */ - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { - assert locNodeAttrs == null; - assert locNodeVer == null; - - if (log.isDebugEnabled()) { - log.debug("Node attributes to set: " + attrs); - log.debug("Node version to set: " + ver); - } - - locNodeAttrs = attrs; - locNodeVer = ver; - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ipFinder.onSpiContextInitialized(spiCtx); - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - ipFinder.onSpiContextDestroyed(); - } - - /** {@inheritDoc} */ - @Override public ClusterNode getLocalNode() { - return locNode; - } - - /** {@inheritDoc} */ - @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { - this.lsnr = lsnr; - } - - /** {@inheritDoc} */ - @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { - this.exchange = exchange; - } - - /** {@inheritDoc} */ - @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { - this.metricsProvider = metricsProvider; - } - - /** {@inheritDoc} */ - @Override public long getGridStartTime() { - assert gridStartTime != 0; - - return gridStartTime; - } - - /** - * @param sockAddr Remote address. - * @return Opened socket. - * @throws IOException If failed. - */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { - assert sockAddr != null; - - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; - - InetAddress addr = resolved.getAddress(); - - assert addr != null; - - Socket sock = new Socket(); - - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(true); - - sock.connect(resolved, (int)sockTimeout); - - writeToSocket(sock, U.GG_HEADER); - - return sock; - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param data Raw data to write. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { - assert sock != null; - assert data != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - out.write(data); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed or write timed out. - * @throws GridException If marshalling failed. - */ - protected void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg) throws IOException, GridException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @param bout Byte array output stream. - * @throws IOException If IO failed or write timed out. - * @throws GridException If marshalling failed. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, GridException { - assert sock != null; - assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes response to the socket. - * - * @param sock Socket. - * @param res Integer response. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, int res) throws IOException { - assert sock != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - OutputStream out = sock.getOutputStream(); - - IOException err = null; - - try { - out.write(res); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Reads message from the socket limiting read time. - * - * @param sock Socket. - * @param in Input stream (in case socket stream was wrapped). - * @param timeout Socket timeout for this operation. - * @return Message. - * @throws IOException If IO failed or read timed out. - * @throws GridException If unmarshalling failed. - */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); - } - catch (IOException | GridException e) { - if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + - "in long GC pauses on remote node. Current timeout: " + timeout + '.'); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Reads message delivery receipt from the socket. - * - * @param sock Socket. - * @param timeout Socket timeout for this operation. - * @return Receipt. - * @throws IOException If IO failed or read timed out. - */ - protected int readReceipt(Socket sock, long timeout) throws IOException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - int res = sock.getInputStream().read(); - - if (res == -1) - throw new EOFException(); - - return res; - } - catch (SocketTimeoutException e) { - LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + - "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + - "configuration property). Will retry to send message with increased timeout. " + - "Current timeout: " + timeout + '.'); - - stats.onAckTimeout(); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Resolves addresses registered in the IP finder, removes duplicates and local host - * address and returns the collection of. - * - * @return Resolved addresses without duplicates and local address (potentially - * empty but never null). - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { - List<InetSocketAddress> res = new ArrayList<>(); - - Collection<InetSocketAddress> addrs; - - // Get consistent addresses collection. - while (true) { - try { - addrs = registeredAddresses(); - - break; - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to get registered addresses from IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (GridInterruptedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - - for (InetSocketAddress addr : addrs) { - assert addr != null; - - try { - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; - - if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) - res.add(resolved); - } - catch (UnknownHostException ignored) { - LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); - - // Add address in any case. - res.add(addr); - } - } - - if (!res.isEmpty()) - Collections.shuffle(res); - - return res; - } - - /** - * Gets addresses registered in the IP finder, initializes addresses having no - * port (or 0 port) with {@link #DFLT_PORT}. - * - * @return Registered addresses. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { - Collection<InetSocketAddress> res = new LinkedList<>(); - - for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { - if (addr.getPort() == 0) - addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) : - new InetSocketAddress(addr.getAddress(), DFLT_PORT); - - res.add(addr); - } - - return res; - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException duplicateIdError(GridTcpDiscoveryDuplicateIdMessage msg) { - assert msg != null; - - return new IgniteSpiException("Local node has the same ID as existing node in topology " + - "(fix configuration and restart local node) [localNode=" + locNode + - ", existingNode=" + msg.node() + ']'); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException authenticationFailedError(GridTcpDiscoveryAuthFailedMessage msg) { - assert msg != null; - - return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" + - msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException checkFailedError(GridTcpDiscoveryCheckFailedMessage msg) { - assert msg != null; - - return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : - new IgniteSpiException(msg.error()); - } - - /** - * @param msg Message. - * @return Whether delivery of the message is ensured. - */ - protected boolean ensured(GridTcpDiscoveryAbstractMessage msg) { - return U.getAnnotation(msg.getClass(), GridTcpDiscoveryEnsureDelivery.class) != null; - } - - /** - * @param msg Failed message. - * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. - * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it - * and create separate message for failed version check with next major release. - */ - @Deprecated - private static boolean versionCheckFailed(GridTcpDiscoveryCheckFailedMessage msg) { - return msg.error().contains("versions are not compatible"); - } - - /** - * Handles sockets timeouts. - */ - protected class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { - @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-disco-sock-timeout-worker", log); - - setPriority(threadPri); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } - - /** - * @param timeoutObj Timeout object to remove. - */ - public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); - - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - SocketTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); - - if (timeoutObj.onTimeout()) { - LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - - stats.onSocketTimeout(); - } - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - SocketTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } - } - } - } - - /** - * Socket timeout object. - */ - protected static class SocketTimeoutObject { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - - /** */ - private final long id = idGen.incrementAndGet(); - - /** */ - private final Socket sock; - - /** */ - private final long endTime; - - /** */ - private final AtomicBoolean done = new AtomicBoolean(); - - /** - * @param sock Socket. - * @param endTime End time. - */ - SocketTimeoutObject(Socket sock, long endTime) { - assert sock != null; - assert endTime > 0; - - this.sock = sock; - this.endTime = endTime; - } - - /** - * @return {@code True} if object has not yet been processed. - */ - boolean cancel() { - return done.compareAndSet(false, true); - } - - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { - if (done.compareAndSet(false, true)) { - // Close socket - timeout occurred. - U.closeQuiet(sock); - - return true; - } - - return false; - } - - /** - * @return End time. - */ - long endTime() { - return endTime; - } - - /** - * @return ID. - */ - long id() { - return id; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SocketTimeoutObject.class, this); - } - } - - /** - * Base class for message workers. - */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - - /** Message queue. */ - private final BlockingDeque<GridTcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); - - /** Backed interrupted flag. */ - private volatile boolean interrupted; - - /** - * @param name Thread name. - */ - protected MessageWorkerAdapter(String name) { - super(gridName, name, log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + locNodeId + ']'); - - while (!isInterrupted()) { - GridTcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); - - if (msg == null) - continue; - - processMessage(msg); - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - interrupted = true; - - super.interrupt(); - } - - /** {@inheritDoc} */ - @Override public boolean isInterrupted() { - return interrupted || super.isInterrupted(); - } - - /** - * @return Current queue size. - */ - int queueSize() { - return queue.size(); - } - - /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(GridTcpDiscoveryAbstractMessage msg) { - assert msg != null; - - if (msg instanceof GridTcpDiscoveryHeartbeatMessage) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - protected abstract void processMessage(GridTcpDiscoveryAbstractMessage msg); - - /** - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed. - * @throws GridException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, GridTcpDiscoveryAbstractMessage msg) - throws IOException, GridException { - bout.reset(); - - GridTcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiMBean.java deleted file mode 100644 index a680fc2..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySpiMBean.java +++ /dev/null @@ -1,268 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Management bean for {@link TcpDiscoverySpi}. - */ -public interface GridTcpDiscoverySpiMBean extends IgniteSpiManagementMBean { - /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - @IgniteMBeanDescription("Heartbeat frequency.") - public long getHeartbeatFrequency(); - - /** - * Gets current SPI state. - * - * @return Current SPI state. - */ - @IgniteMBeanDescription("SPI state.") - public String getSpiState(); - - /** - * Gets {@link GridTcpDiscoveryIpFinder} (string representation). - * - * @return IPFinder (string representation). - */ - @IgniteMBeanDescription("IP Finder.") - public String getIpFinderFormatted(); - - /** - * Gets number of connection attempts. - * - * @return Number of connection attempts. - */ - @IgniteMBeanDescription("Reconnect count.") - public int getReconnectCount(); - - /** - * Gets network timeout. - * - * @return Network timeout. - */ - @IgniteMBeanDescription("Network timeout.") - public long getNetworkTimeout(); - - /** - * Gets local TCP port SPI listens to. - * - * @return Local port range. - */ - @IgniteMBeanDescription("Local TCP port.") - public int getLocalPort(); - - /** - * Gets local TCP port range. - * - * @return Local port range. - */ - @IgniteMBeanDescription("Local TCP port range.") - public int getLocalPortRange(); - - /** - * Gets max heartbeats count node can miss without initiating status check. - * - * @return Max missed heartbeats. - */ - @IgniteMBeanDescription("Max missed heartbeats.") - public int getMaxMissedHeartbeats(); - - /** - * Gets max heartbeats count node can miss without failing client node. - * - * @return Max missed client heartbeats. - */ - @IgniteMBeanDescription("Max missed client heartbeats.") - public int getMaxMissedClientHeartbeats(); - - /** - * Gets thread priority. All threads within SPI will be started with it. - * - * @return Thread priority. - */ - @IgniteMBeanDescription("Threads priority.") - public int getThreadPriority(); - - /** - * Gets IP finder clean frequency. - * - * @return IP finder clean frequency. - */ - @IgniteMBeanDescription("IP finder clean frequency.") - public long getIpFinderCleanFrequency(); - - /** - * Gets statistics print frequency. - * - * @return Statistics print frequency in milliseconds. - */ - @IgniteMBeanDescription("Statistics print frequency.") - public long getStatisticsPrintFrequency(); - - /** - * Gets message worker queue current size. - * - * @return Message worker queue current size. - */ - @IgniteMBeanDescription("Message worker queue current size.") - public int getMessageWorkerQueueSize(); - - /** - * Gets joined nodes count. - * - * @return Nodes joined count. - */ - @IgniteMBeanDescription("Nodes joined count.") - public long getNodesJoined(); - - /** - * Gets left nodes count. - * - * @return Left nodes count. - */ - @IgniteMBeanDescription("Nodes left count.") - public long getNodesLeft(); - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - @IgniteMBeanDescription("Nodes failed count.") - public long getNodesFailed(); - - /** - * Gets pending messages registered count. - * - * @return Pending messages registered count. - */ - @IgniteMBeanDescription("Pending messages registered.") - public long getPendingMessagesRegistered(); - - /** - * Gets pending messages discarded count. - * - * @return Pending messages registered count. - */ - @IgniteMBeanDescription("Pending messages discarded.") - public long getPendingMessagesDiscarded(); - - /** - * Gets avg message processing time. - * - * @return Avg message processing time. - */ - @IgniteMBeanDescription("Avg message processing time.") - public long getAvgMessageProcessingTime(); - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - @IgniteMBeanDescription("Max message processing time.") - public long getMaxMessageProcessingTime(); - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - @IgniteMBeanDescription("Total received messages count.") - public int getTotalReceivedMessages(); - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @IgniteMBeanDescription("Received messages by type.") - public Map<String, Integer> getReceivedMessages(); - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - @IgniteMBeanDescription("Total processed messages count.") - public int getTotalProcessedMessages(); - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - @IgniteMBeanDescription("Received messages by type.") - public Map<String, Integer> getProcessedMessages(); - - /** - * Gets time local node has been coordinator since. - * - * @return Time local node is coordinator since. - */ - @IgniteMBeanDescription("Local node is coordinator since.") - public long getCoordinatorSinceTimestamp(); - - /** - * Gets current coordinator. - * - * @return Gets current coordinator. - */ - @IgniteMBeanDescription("Coordinator node ID.") - @Nullable public UUID getCoordinator(); - - /** - * Gets message acknowledgement timeout. - * - * @return Message acknowledgement timeout. - */ - @IgniteMBeanDescription("Message acknowledgement timeout.") - public long getAckTimeout(); - - /** - * Gets maximum message acknowledgement timeout. - * - * @return Maximum message acknowledgement timeout. - */ - @IgniteMBeanDescription("Maximum message acknowledgement timeout.") - public long getMaxAckTimeout(); - - /** - * Gets socket timeout. - * - * @return Socket timeout. - */ - @IgniteMBeanDescription("Socket timeout.") - public long getSocketTimeout(); - - /** - * Gets join timeout. - * - * @return Join timeout. - */ - @IgniteMBeanDescription("Join timeout.") - public long getJoinTimeout(); - - /** - * Dumps debug info using configured logger. - */ - @IgniteMBeanDescription("Dump debug info.") - public void dumpDebugInfo(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java index d04421d..33ec848 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -47,12 +47,12 @@ import static org.gridgain.grid.spi.discovery.tcp.messages.GridTcpDiscoveryHeart @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean { +public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean { /** Default disconnect check interval. */ public static final long DFLT_DISCONNECT_CHECK_INT = 2000; /** Remote nodes. */ - private final ConcurrentMap<UUID, GridTcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); /** Socket. */ private volatile Socket sock; @@ -245,7 +245,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); } - locNode = new GridTcpDiscoveryNode( + locNode = new TcpDiscoveryNode( locNodeId, addrs.get1(), addrs.get2(), @@ -326,8 +326,8 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements /** {@inheritDoc} */ @Override public Collection<ClusterNode> getRemoteNodes() { - return F.view(U.<GridTcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<GridTcpDiscoveryNode>() { - @Override public boolean apply(GridTcpDiscoveryNode node) { + return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { return node.visible(); } })); @@ -338,7 +338,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements if (locNodeId.equals(nodeId)) return locNode; - GridTcpDiscoveryNode node = rmtNodes.get(nodeId); + TcpDiscoveryNode node = rmtNodes.get(nodeId); return node != null && node.visible() ? node : null; } @@ -350,7 +350,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements if (nodeId.equals(locNodeId)) return true; - GridTcpDiscoveryNode node = rmtNodes.get(nodeId); + TcpDiscoveryNode node = rmtNodes.get(nodeId); return node != null && node.visible(); } @@ -857,16 +857,16 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements if (leaveLatch != null) return; - GridTcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = msg.node(); UUID newNodeId = node.id(); if (locNodeId.equals(newNodeId)) { if (joinLatch.getCount() > 0) { - Collection<GridTcpDiscoveryNode> top = msg.topology(); + Collection<TcpDiscoveryNode> top = msg.topology(); if (top != null) { - for (GridTcpDiscoveryNode n : top) { + for (TcpDiscoveryNode n : top) { if (n.order() > 0) n.visible(true); @@ -934,7 +934,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements "[msg=" + msg + ", locNode=" + locNode + ']'); } else { - GridTcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); + TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); if (node == null) { if (log.isDebugEnabled()) @@ -984,7 +984,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements if (leaveLatch != null) return; - GridTcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); + TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); if (node == null) { if (log.isDebugEnabled()) @@ -1016,7 +1016,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements return; if (!locNodeId.equals(msg.creatorNodeId())) { - GridTcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); + TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); if (node == null) { if (log.isDebugEnabled()) @@ -1142,7 +1142,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements assert nodeId != null; assert metrics != null; - GridTcpDiscoveryNode node = nodeId.equals(locNodeId) ? locNode : rmtNodes.get(nodeId); + TcpDiscoveryNode node = nodeId.equals(locNodeId) ? locNode : rmtNodes.get(nodeId); if (node != null && node.visible()) { node.setMetrics(metrics); @@ -1186,7 +1186,7 @@ public class TcpClientDiscoverySpi extends GridTcpDiscoverySpiAdapter implements private Collection<ClusterNode> allNodes() { Collection<ClusterNode> allNodes = new TreeSet<>(); - for (GridTcpDiscoveryNode node : rmtNodes.values()) { + for (TcpDiscoveryNode node : rmtNodes.values()) { if (node.visible()) allNodes.add(node); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java index 34b0ad4..4541e94 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java @@ -46,7 +46,7 @@ import java.util.concurrent.*; import static org.apache.ignite.events.IgniteEventType.*; import static org.gridgain.grid.kernal.GridNodeAttributes.*; import static org.apache.ignite.spi.IgnitePortProtocol.*; -import static org.gridgain.grid.spi.discovery.tcp.internal.GridTcpDiscoverySpiState.*; +import static org.gridgain.grid.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; import static org.gridgain.grid.spi.discovery.tcp.messages.GridTcpDiscoveryHeartbeatMessage.*; import static org.gridgain.grid.spi.discovery.tcp.messages.GridTcpDiscoveryStatusCheckMessage.*; @@ -139,7 +139,7 @@ import static org.gridgain.grid.spi.discovery.tcp.messages.GridTcpDiscoveryStatu @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridTcpDiscoverySpiMBean { +public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean { /** Default local port range (value is <tt>100</tt>). */ public static final int DFLT_PORT_RANGE = 100; @@ -207,7 +207,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT /** Nodes ring. */ @GridToStringExclude - private final GridTcpDiscoveryNodesRing ring = new GridTcpDiscoveryNodesRing(); + private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); /** Topology snapshots history. */ private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); @@ -242,10 +242,10 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT private StatisticsPrinter statsPrinter; /** Failed nodes (but still in topology). */ - private Collection<GridTcpDiscoveryNode> failedNodes = new HashSet<>(); + private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); /** Leaving nodes (but still in topology). */ - private Collection<GridTcpDiscoveryNode> leavingNodes = new HashSet<>(); + private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ private boolean ipFinderHasLocAddr; @@ -374,7 +374,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT /** {@inheritDoc} */ @Override public int getLocalPort() { - GridTcpDiscoveryNode locNode0 = locNode; + TcpDiscoveryNode locNode0 = locNode; return locNode0 != null ? locNode0.discoveryPort() : 0; } @@ -604,7 +604,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT /** {@inheritDoc} */ @Nullable @Override public UUID getCoordinator() { - GridTcpDiscoveryNode crd = resolveCoordinator(); + TcpDiscoveryNode crd = resolveCoordinator(); return crd != null ? crd.id() : null; } @@ -619,7 +619,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT // Return local node directly. return locNode; - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node != null && !node.visible()) return null; @@ -629,7 +629,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT /** {@inheritDoc} */ @Override public Collection<ClusterNode> getRemoteNodes() { - return F.<GridTcpDiscoveryNode, ClusterNode>upcast(ring.visibleRemoteNodes()); + return F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleRemoteNodes()); } /** {@inheritDoc} */ @@ -694,7 +694,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); } - locNode = new GridTcpDiscoveryNode( + locNode = new TcpDiscoveryNode( locNodeId, addrs.get1(), addrs.get2(), @@ -853,7 +853,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT if (hbFreq < 2000) U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq); - registerMBean(gridName, this, GridTcpDiscoverySpiMBean.class); + registerMBean(gridName, this, TcpDiscoverySpiMBean.class); if (ipFinder instanceof GridTcpDiscoveryMulticastIpFinder) { GridTcpDiscoveryMulticastIpFinder mcastIpFinder = ((GridTcpDiscoveryMulticastIpFinder)ipFinder); @@ -988,7 +988,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT if (ipFinder != null) ipFinder.close(); - Collection<GridTcpDiscoveryNode> rmts = null; + Collection<TcpDiscoveryNode> rmts = null; if (!disconnect) { // This is final stop. @@ -1015,7 +1015,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT if (lsnr != null) { Collection<ClusterNode> processed = new LinkedList<>(); - for (GridTcpDiscoveryNode n : rmts) { + for (TcpDiscoveryNode n : rmts) { assert n.visible(); processed.add(n); @@ -1087,7 +1087,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT if (nodeId == locNodeId) return true; - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node == null || !node.visible()) return false; @@ -1109,7 +1109,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @param node Node. * @return {@code True} if ping succeeds. */ - private boolean pingNode(GridTcpDiscoveryNode node) { + private boolean pingNode(TcpDiscoveryNode node) { assert node != null; if (node.id().equals(locNodeId)) @@ -1649,7 +1649,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @param node Node to marshall credentials for. * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed. */ - private void marshalCredentials(GridTcpDiscoveryNode node) throws IgniteSpiException { + private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { try { // Use security-unsafe getter. Map<String, Object> attrs = new HashMap<>(node.getAttributes()); @@ -1671,7 +1671,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @return Security credentials. * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails. */ - private GridSecurityCredentials unmarshalCredentials(GridTcpDiscoveryNode node) throws IgniteSpiException { + private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { try { byte[] credBytes = (byte[])node.getAttributes().get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS); @@ -1709,20 +1709,20 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @param topVer Topology version. * @param node Remote node this event is connected with. */ - private void notifyDiscovery(int type, long topVer, GridTcpDiscoveryNode node) { + private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) { assert type > 0; assert node != null; DiscoverySpiListener lsnr = this.lsnr; - GridTcpDiscoverySpiState spiState = spiStateCopy(); + TcpDiscoverySpiState spiState = spiStateCopy(); if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { if (log.isDebugEnabled()) log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - Collection<ClusterNode> top = F.<GridTcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); + Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes()); Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top); @@ -1763,7 +1763,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * Internal addresses placed before external addresses. */ @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(GridTcpDiscoveryNode node) { + private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) { LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses()); Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); @@ -1782,7 +1782,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}. */ @SuppressWarnings("TypeMayBeWeakened") - private LinkedHashSet<InetSocketAddress> getNodeAddresses(GridTcpDiscoveryNode node, boolean sameHost) { + private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) { List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses()); Collections.sort(addrs, U.inetAddressesComparator(sameHost)); @@ -1817,8 +1817,8 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT /** * @return Spi state copy. */ - private GridTcpDiscoverySpiState spiStateCopy() { - GridTcpDiscoverySpiState state; + private TcpDiscoverySpiState spiStateCopy() { + TcpDiscoverySpiState state; synchronized (mux) { state = spiState; @@ -1834,7 +1834,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @return Coordinator node or {@code null} if there are no coordinator * (i.e. local node is the last one and is currently stopping). */ - @Nullable private GridTcpDiscoveryNode resolveCoordinator() { + @Nullable private TcpDiscoveryNode resolveCoordinator() { return resolveCoordinator(null); } @@ -1846,10 +1846,10 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @return Coordinator node or {@code null} if there are no coordinator * (i.e. local node is the last one and is currently stopping). */ - @Nullable private GridTcpDiscoveryNode resolveCoordinator( - @Nullable Collection<GridTcpDiscoveryNode> filter) { + @Nullable private TcpDiscoveryNode resolveCoordinator( + @Nullable Collection<TcpDiscoveryNode> filter) { synchronized (mux) { - Collection<GridTcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); + Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); if (!F.isEmpty(filter)) excluded = F.concat(false, excluded, filter); @@ -1873,7 +1873,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT Runtime runtime = Runtime.getRuntime(); - GridTcpDiscoveryNode coord = resolveCoordinator(); + TcpDiscoveryNode coord = resolveCoordinator(); log.info("Discovery SPI statistics [statistics=" + stats + ", spiState=" + spiStateCopy() + ", coord=" + coord + @@ -1899,13 +1899,13 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT if (msg instanceof GridTcpDiscoveryNodeAddedMessage) { GridTcpDiscoveryNodeAddedMessage nodeAddedMsg = (GridTcpDiscoveryNodeAddedMessage)msg; - GridTcpDiscoveryNode node = nodeAddedMsg.node(); + TcpDiscoveryNode node = nodeAddedMsg.node(); if (node.id().equals(destNodeId)) { - Collection<GridTcpDiscoveryNode> allNodes = ring.allNodes(); - Collection<GridTcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size()); - for (GridTcpDiscoveryNode n0 : allNodes) { + for (TcpDiscoveryNode n0 : allNodes) { assert n0.internalOrder() != 0 : n0; // Skip next node and nodes added after next @@ -1995,7 +1995,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT void forceNextNodeFailure() { U.warn(log, "Next node will be forcibly failed (if any)."); - GridTcpDiscoveryNode next; + TcpDiscoveryNode next; synchronized (mux) { next = ring.nextNode(failedNodes); @@ -2023,7 +2023,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * * @return Nodes ring. */ - GridTcpDiscoveryNodesRing ring() { + TcpDiscoveryNodesRing ring() { return ring; } @@ -2083,14 +2083,14 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT b.append("Leaving nodes: ").append(U.nl()); - for (GridTcpDiscoveryNode node : leavingNodes) + for (TcpDiscoveryNode node : leavingNodes) b.append(" ").append(node.id()).append(U.nl()); b.append(U.nl()); b.append("Failed nodes: ").append(U.nl()); - for (GridTcpDiscoveryNode node : failedNodes) + for (TcpDiscoveryNode node : failedNodes) b.append(" ").append(node.id()).append(U.nl()); b.append(U.nl()); @@ -2319,8 +2319,8 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT Collection<InetSocketAddress> currAddrs = F.flatCollections( F.viewReadOnly( ring.allNodes(), - new C1<GridTcpDiscoveryNode, Collection<InetSocketAddress>>() { - @Override public Collection<InetSocketAddress> apply(GridTcpDiscoveryNode node) { + new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() { + @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) { return !node.isClient() ? getNodeAddresses(node) : Collections.<InetSocketAddress>emptyList(); } @@ -2490,7 +2490,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT private class RingMessageWorker extends MessageWorkerAdapter { /** Next node. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private GridTcpDiscoveryNode next; + private TcpDiscoveryNode next; /** Pending messages. */ private final PendingMessages pendingMsgs = new PendingMessages(); @@ -2585,9 +2585,9 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT clientMsgWorker.addMessage(msg); } - Collection<GridTcpDiscoveryNode> failedNodes; + Collection<TcpDiscoveryNode> failedNodes; - GridTcpDiscoverySpiState state; + TcpDiscoverySpiState state; synchronized (mux) { failedNodes = U.arrayList(TcpDiscoverySpi.this.failedNodes); @@ -2603,7 +2603,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT while (true) { if (searchNext) { - GridTcpDiscoveryNode newNext = ring.nextNode(failedNodes); + TcpDiscoveryNode newNext = ring.nextNode(failedNodes); if (newNext == null) { if (log.isDebugEnabled()) @@ -2950,7 +2950,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT TcpDiscoverySpi.this.failedNodes.addAll(failedNodes); } - for (GridTcpDiscoveryNode n : failedNodes) + for (TcpDiscoveryNode n : failedNodes) msgWorker.addMessage(new GridTcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder())); } } @@ -2989,7 +2989,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT private void processJoinRequestMessage(GridTcpDiscoveryJoinRequestMessage msg) { assert msg != null; - GridTcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = msg.node(); if (!msg.client()) { boolean rmtHostLoopback = node.socketAddresses().size() == 1 && @@ -3031,7 +3031,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT } if (isLocalNodeCoordinator()) { - GridTcpDiscoveryNode existingNode = ring.node(node.id()); + TcpDiscoveryNode existingNode = ring.node(node.id()); if (existingNode != null) { if (!node.socketAddresses().equals(existingNode.socketAddresses())) { @@ -3397,10 +3397,10 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT * @param msg Message. * @throws org.apache.ignite.spi.IgniteSpiException Last failure if all attempts failed. */ - private void trySendMessageDirectly(GridTcpDiscoveryNode node, GridTcpDiscoveryAbstractMessage msg) + private void trySendMessageDirectly(TcpDiscoveryNode node, GridTcpDiscoveryAbstractMessage msg) throws IgniteSpiException { if (node.isClient()) { - GridTcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId()); + TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId()); if (routerNode == null) throw new IgniteSpiException("Router node for client does not exist: " + node); @@ -3447,7 +3447,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT else { UUID nodeId = msg.creatorNodeId(); - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); assert node == null || node.isClient(); @@ -3505,7 +3505,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT private void processNodeAddedMessage(GridTcpDiscoveryNodeAddedMessage msg) { assert msg != null; - GridTcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = msg.node(); assert node != null; @@ -3635,12 +3635,12 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT synchronized (mux) { if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) { // Initialize topology. - Collection<GridTcpDiscoveryNode> top = msg.topology(); + Collection<TcpDiscoveryNode> top = msg.topology(); if (top != null && !top.isEmpty()) { gridStartTime = msg.gridStartTime(); - for (GridTcpDiscoveryNode n : top) { + for (TcpDiscoveryNode n : top) { // Make all preceding nodes and local node visible. n.visible(true); } @@ -3710,7 +3710,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT assert nodeId != null; - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node == null) { if (log.isDebugEnabled()) @@ -3876,7 +3876,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT return; } - GridTcpDiscoveryNode leavingNode = ring.node(leavingNodeId); + TcpDiscoveryNode leavingNode = ring.node(leavingNodeId); if (leavingNode != null) { synchronized (mux) { @@ -3905,7 +3905,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT } if (msg.verified() && !locNodeId.equals(leavingNodeId)) { - GridTcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId); + TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId); assert leftNode != null; @@ -4016,7 +4016,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT UUID sndId = msg.senderNodeId(); if (sndId != null) { - GridTcpDiscoveryNode sndNode = ring.node(sndId); + TcpDiscoveryNode sndNode = ring.node(sndId); if (sndNode == null) { if (log.isDebugEnabled()) @@ -4043,7 +4043,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT UUID nodeId = msg.failedNodeId(); long order = msg.order(); - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node != null && node.internalOrder() != order) { if (log.isDebugEnabled()) @@ -4335,7 +4335,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT Collection<UUID> clientNodeIds = msg.clientNodeIds(); - for (GridTcpDiscoveryNode clientNode : ring.clientNodes()) { + for (TcpDiscoveryNode clientNode : ring.clientNodes()) { if (clientNode.visible()) { if (clientNodeIds.contains(clientNode.id())) clientNode.aliveCheck(maxMissedClientHbs); @@ -4370,7 +4370,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT assert nodeId != null; assert metrics != null; - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); if (node != null) { node.setMetrics(metrics); @@ -4733,7 +4733,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT } else if (msg instanceof GridTcpDiscoveryClientReconnectMessage) { if (client) { - GridTcpDiscoverySpiState state = spiStateCopy(); + TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { writeToSocket(sock, RES_OK); @@ -4755,7 +4755,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT boolean ignored = false; - GridTcpDiscoverySpiState state = null; + TcpDiscoverySpiState state = null; synchronized (mux) { if (spiState == CONNECTING) { @@ -4784,7 +4784,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT boolean ignored = false; - GridTcpDiscoverySpiState state = null; + TcpDiscoverySpiState state = null; synchronized (mux) { if (spiState == CONNECTING) { @@ -4813,7 +4813,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT boolean ignored = false; - GridTcpDiscoverySpiState state = null; + TcpDiscoverySpiState state = null; synchronized (mux) { if (spiState == CONNECTING) { @@ -4842,7 +4842,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT boolean ignored = false; - GridTcpDiscoverySpiState state = null; + TcpDiscoverySpiState state = null; synchronized (mux) { if (spiState == CONNECTING) { @@ -4935,7 +4935,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT */ private boolean nodeAlive(UUID nodeId) { // Is node alive or about to be removed from the ring? - GridTcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode node = ring.node(nodeId); boolean nodeAlive = node != null && node.visible(); @@ -4959,7 +4959,7 @@ public class TcpDiscoverySpi extends GridTcpDiscoverySpiAdapter implements GridT assert msg != null; assert !msg.responded(); - GridTcpDiscoverySpiState state = spiStateCopy(); + TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { writeToSocket(sock, RES_OK);