Repository: incubator-ignite Updated Branches: refs/heads/master b7a633832 -> 12837f35b
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryStatistics.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryStatistics.java deleted file mode 100644 index 3ba21f0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/GridTcpDiscoveryStatistics.java +++ /dev/null @@ -1,639 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.discovery.tcp.internal; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.spi.discovery.tcp.messages.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Statistics for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. - */ -public class GridTcpDiscoveryStatistics { - /** Join started timestamp. */ - private long joinStartedTs; - - /** Join finished timestamp. */ - private long joinFinishedTs; - - /** Coordinator since timestamp. */ - private final AtomicLong crdSinceTs = new AtomicLong(); - - /** Joined nodes count. */ - private int joinedNodesCnt; - - /** Failed nodes count. */ - private int failedNodesCnt; - - /** Left nodes count. */ - private int leftNodesCnt; - - /** Ack timeouts count. */ - private int ackTimeoutsCnt; - - /** Socket timeouts count. */ - private int sockTimeoutsCnt; - - /** Received messages. */ - @GridToStringInclude - private final Map<String, Integer> rcvdMsgs = new HashMap<>(); - - /** Processed messages. */ - @GridToStringInclude - private final Map<String, Integer> procMsgs = new HashMap<>(); - - /** Average time taken to serialize messages. */ - @GridToStringInclude - private final Map<String, Long> avgMsgsSndTimes = new HashMap<>(); - - /** Average time taken to serialize messages. */ - @GridToStringInclude - private final Map<String, Long> maxMsgsSndTimes = new HashMap<>(); - - /** Sent messages. */ - @GridToStringInclude - private final Map<String, Integer> sentMsgs = new HashMap<>(); - - /** Messages receive timestamps. */ - private final Map<IgniteUuid, Long> msgsRcvTs = new GridBoundedLinkedHashMap<>(1024); - - /** Messages processing start timestamps. */ - private final Map<IgniteUuid, Long> msgsProcStartTs = new GridBoundedLinkedHashMap<>(1024); - - /** Ring messages sent timestamps. */ - private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); - - /** Average time messages is in queue. */ - private long avgMsgQueueTime; - - /** Max time messages is in queue. */ - private long maxMsgQueueTime; - - /** Total number of ring messages sent. */ - private int ringMsgsSent; - - /** Average time it takes for messages to pass the full ring. */ - private long avgRingMsgTime; - - /** Max time it takes for messages to pass the full ring. */ - private long maxRingMsgTime; - - /** Class name of ring message that required the biggest time for full ring traverse. */ - private String maxRingTimeMsgCls; - - /** Average message processing time. */ - private long avgMsgProcTime; - - /** Max message processing time. */ - private long maxMsgProcTime; - - /** Class name of the message that required the biggest time to process. */ - private String maxProcTimeMsgCls; - - /** Socket readers created count. */ - private int sockReadersCreated; - - /** Socket readers removed count. */ - private int sockReadersRmv; - - /** Average time it takes to initialize connection from another node. */ - private long avgSrvSockInitTime; - - /** Max time it takes to initialize connection from another node. */ - private long maxSrvSockInitTime; - - /** Number of outgoing connections established. */ - private int clientSockCreatedCnt; - - /** Average time it takes to connect to another node. */ - private long avgClientSockInitTime; - - /** Max time it takes to connect to another node. */ - private long maxClientSockInitTime; - - /** Pending messages registered count. */ - private int pendingMsgsRegistered; - - /** Pending messages discarded count. */ - private int pendingMsgsDiscarded; - - /** - * Increments joined nodes count. - */ - public synchronized void onNodeJoined() { - joinedNodesCnt++; - } - - /** - * Increments left nodes count. - */ - public synchronized void onNodeLeft() { - leftNodesCnt++; - } - - /** - * Increments failed nodes count. - */ - public synchronized void onNodeFailed() { - failedNodesCnt++; - } - - /** - * Increments ack timeouts count. - */ - public synchronized void onAckTimeout() { - ackTimeoutsCnt++; - } - - /** - * Increments socket timeouts count. - */ - public synchronized void onSocketTimeout() { - sockTimeoutsCnt++; - } - - /** - * Initializes coordinator since date (if needed). - */ - public void onBecomingCoordinator() { - crdSinceTs.compareAndSet(0, U.currentTimeMillis()); - } - - /** - * Initializes join started timestamp. - */ - public synchronized void onJoinStarted() { - joinStartedTs = U.currentTimeMillis(); - } - - /** - * Initializes join finished timestamp. - */ - public synchronized void onJoinFinished() { - joinFinishedTs = U.currentTimeMillis(); - } - - /** - * @return Join started timestamp. - */ - public synchronized long joinStarted() { - return joinStartedTs; - } - - /** - * @return Join finished timestamp. - */ - public synchronized long joinFinished() { - return joinFinishedTs; - } - - /** - * Collects necessary stats for message received by SPI. - * - * @param msg Received message. - */ - public synchronized void onMessageReceived(GridTcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Integer cnt = F.addIfAbsent(rcvdMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - rcvdMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - msgsRcvTs.put(msg.id(), U.currentTimeMillis()); - } - - /** - * Collects necessary stats for message processed by SPI. - * - * @param msg Processed message. - */ - public synchronized void onMessageProcessingStarted(GridTcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Integer cnt = F.addIfAbsent(procMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - procMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - Long rcvdTs = msgsRcvTs.remove(msg.id()); - - if (rcvdTs != null) { - long duration = U.currentTimeMillis() - rcvdTs; - - if (maxMsgQueueTime < duration) - maxMsgQueueTime = duration; - - avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages(); - } - - msgsProcStartTs.put(msg.id(), U.currentTimeMillis()); - } - - /** - * Collects necessary stats for message processed by SPI. - * - * @param msg Processed message. - */ - public synchronized void onMessageProcessingFinished(GridTcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Long startTs = msgsProcStartTs.get(msg.id()); - - if (startTs != null) { - long duration = U.currentTimeMillis() - startTs; - - avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages(); - - if (duration > maxMsgProcTime) { - maxMsgProcTime = duration; - - maxProcTimeMsgCls = msg.getClass().getSimpleName(); - } - - msgsProcStartTs.remove(msg.id()); - } - } - - /** - * Called by coordinator when ring message is sent. - * - * @param msg Sent message. - * @param time Time taken to serialize message. - */ - public synchronized void onMessageSent(GridTcpDiscoveryAbstractMessage msg, long time) { - assert msg != null; - assert time >= 0; - - if (crdSinceTs.get() > 0 && - (msg instanceof GridTcpDiscoveryNodeAddedMessage) || - (msg instanceof GridTcpDiscoveryNodeLeftMessage) || - (msg instanceof GridTcpDiscoveryNodeFailedMessage)) { - ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); - - ringMsgsSent++; - } - - Integer cnt = F.addIfAbsent(sentMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { - @Override public Integer call() { - return 0; - } - }); - - assert cnt != null; - - sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); - - Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { - @Override public Long call() { - return 0L; - } - }); - - assert avgTime != null; - - avgTime = (avgTime * (cnt - 1) + time) / cnt; - - avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); - - Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { - @Override public Long call() { - return 0L; - } - }); - - assert maxTime != null; - - if (time > maxTime) - maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); - } - - /** - * Called by coordinator when ring message makes full pass. - * - * @param msg Message. - */ - public synchronized void onRingMessageReceived(GridTcpDiscoveryAbstractMessage msg) { - assert msg != null; - - Long sentTs = ringMsgsSndTs.get(msg.id()); - - if (sentTs != null) { - long duration = U.currentTimeMillis() - sentTs; - - if (maxRingMsgTime < duration) { - maxRingMsgTime = duration; - - maxRingTimeMsgCls = msg.getClass().getSimpleName(); - } - - if (ringMsgsSent != 0) - avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent; - } - } - - /** - * Gets max time for ring message to make full pass. - * - * @return Max full pass time. - */ - public synchronized long maxRingMessageTime() { - return maxRingMsgTime; - } - - /** - * Gets class name of the message that took max time to make full pass. - * - * @return Message class name. - */ - public synchronized String maxRingDurationMessageClass() { - return maxRingTimeMsgCls; - } - - /** - * Gets class name of the message took max time to process. - * - * @return Message class name. - */ - public synchronized String maxProcessingTimeMessageClass() { - return maxProcTimeMsgCls; - } - - /** - * @param initTime Time socket was initialized in. - */ - public synchronized void onServerSocketInitialized(long initTime) { - assert initTime >= 0; - - if (maxSrvSockInitTime < initTime) - maxSrvSockInitTime = initTime; - - avgSrvSockInitTime = (avgSrvSockInitTime * (sockReadersCreated - 1) + initTime) / sockReadersCreated; - } - - /** - * @param initTime Time socket was initialized in. - */ - public synchronized void onClientSocketInitialized(long initTime) { - assert initTime >= 0; - - clientSockCreatedCnt++; - - if (maxClientSockInitTime < initTime) - maxClientSockInitTime = initTime; - - avgClientSockInitTime = (avgClientSockInitTime * (clientSockCreatedCnt - 1) + initTime) / clientSockCreatedCnt; - } - - /** - * Increments pending messages registered count. - */ - public synchronized void onPendingMessageRegistered() { - pendingMsgsRegistered++; - } - - /** - * Increments pending messages discarded count. - */ - public synchronized void onPendingMessageDiscarded() { - pendingMsgsDiscarded++; - } - - /** - * Increments socket readers created count. - */ - public synchronized void onSocketReaderCreated() { - sockReadersCreated++; - } - - /** - * Increments socket readers removed count. - */ - public synchronized void onSocketReaderRemoved() { - sockReadersRmv++; - } - - /** - * Gets processed messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public synchronized Map<String, Integer> processedMessages() { - return new HashMap<>(procMsgs); - } - - /** - * Gets received messages counts (grouped by type). - * - * @return Map containing message types and respective counts. - */ - public synchronized Map<String, Integer> receivedMessages() { - return new HashMap<>(rcvdMsgs); - } - - /** - * Gets max messages send time (grouped by type). - * - * @return Map containing messages types and max send times. - */ - public synchronized Map<String, Long> maxMessagesSendTimes() { - return new HashMap<>(maxMsgsSndTimes); - } - - /** - * Gets average messages send time (grouped by type). - * - * @return Map containing messages types and average send times. - */ - public synchronized Map<String, Long> avgMessagesSendTimes() { - return new HashMap<>(avgMsgsSndTimes); - } - - /** - * Gets total received messages count. - * - * @return Total received messages count. - */ - public synchronized int totalReceivedMessages() { - return F.sumInt(receivedMessages().values()); - } - - /** - * Gets total processed messages count. - * - * @return Total processed messages count. - */ - public synchronized int totalProcessedMessages() { - return F.sumInt(processedMessages().values()); - } - - /** - * Gets max message processing time. - * - * @return Max message processing time. - */ - public synchronized long maxMessageProcessingTime(){ - return maxMsgProcTime; - } - - /** - * Gets average message processing time. - * - * @return Average message processing time. - */ - public synchronized long avgMessageProcessingTime() { - return avgMsgProcTime; - } - - /** - * Gets pending messages registered count. - * - * @return Pending messages registered count. - */ - public synchronized long pendingMessagesRegistered() { - return pendingMsgsRegistered; - } - - /** - * Gets pending messages discarded count. - * - * @return Pending messages registered count. - */ - public synchronized long pendingMessagesDiscarded() { - return pendingMsgsDiscarded; - } - - /** - * Gets nodes joined count. - * - * @return Nodes joined count. - */ - public synchronized int joinedNodesCount() { - return joinedNodesCnt; - } - - /** - * Gets nodes left count. - * - * @return Nodes left count. - */ - public synchronized int leftNodesCount() { - return leftNodesCnt; - } - - /** - * Gets failed nodes count. - * - * @return Failed nodes count. - */ - public synchronized int failedNodesCount() { - return failedNodesCnt; - } - - /** - * @return Ack timeouts count. - */ - public synchronized int ackTimeoutsCount() { - return ackTimeoutsCnt; - } - - /** - * @return Socket timeouts count. - */ - public synchronized int socketTimeoutsCount() { - return sockTimeoutsCnt; - } - - /** - * Gets socket readers created count. - * - * @return Socket readers created count. - */ - public synchronized int socketReadersCreated() { - return sockReadersCreated; - } - - /** - * Gets socket readers removed count. - * - * @return Socket readers removed count. - */ - public synchronized int socketReadersRemoved() { - return sockReadersRmv; - } - - /** - * Gets time local node has been coordinator since. - * - * @return Coordinator since timestamp. - */ - public long coordinatorSinceTimestamp() { - return crdSinceTs.get(); - } - - /** - * Clears statistics. - */ - public synchronized void clear() { - ackTimeoutsCnt = 0; - avgClientSockInitTime = 0; - avgMsgProcTime = 0; - avgMsgQueueTime = 0; - avgMsgsSndTimes.clear(); - avgRingMsgTime = 0; - avgSrvSockInitTime = 0; - clientSockCreatedCnt = 0; - crdSinceTs.set(0); - failedNodesCnt = 0; - joinedNodesCnt = 0; - joinFinishedTs = 0; - joinStartedTs = 0; - leftNodesCnt = 0; - maxClientSockInitTime = 0; - maxMsgProcTime = 0; - maxMsgQueueTime = 0; - maxMsgsSndTimes.clear(); - maxProcTimeMsgCls = null; - maxRingMsgTime = 0; - maxRingTimeMsgCls = null; - maxSrvSockInitTime = 0; - pendingMsgsDiscarded = 0; - pendingMsgsRegistered = 0; - procMsgs.clear(); - rcvdMsgs.clear(); - ringMsgsSent = 0; - sentMsgs.clear(); - sockReadersCreated = 0; - sockReadersRmv = 0; - sockTimeoutsCnt = 0; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridTcpDiscoveryStatistics.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java new file mode 100644 index 0000000..fe2d419 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -0,0 +1,443 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.internal; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.product.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.spi.discovery.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.gridgain.grid.kernal.GridNodeAttributes.*; + +/** + * Node for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. + * <p> + * <strong>This class is not intended for public use</strong> and has been made + * <tt>public</tt> due to certain limitations of Java technology. + */ +public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode, + Comparable<TcpDiscoveryNode>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Node ID. */ + private UUID id; + + /** Consistent ID. */ + private Object consistentId; + + /** Node attributes. */ + @GridToStringExclude + private Map<String, Object> attrs; + + /** Internal discovery addresses as strings. */ + @GridToStringInclude + private Collection<String> addrs; + + /** Internal discovery host names as strings. */ + private Collection<String> hostNames; + + /** */ + @GridToStringInclude + private Collection<InetSocketAddress> sockAddrs; + + /** */ + @GridToStringInclude + private int discPort; + + /** Node metrics. */ + @GridToStringExclude + private volatile ClusterNodeMetrics metrics; + + /** Node order in the topology. */ + private volatile long order; + + /** Node order in the topology (internal). */ + private volatile long intOrder; + + /** The most recent time when heartbeat message was received from the node. */ + @GridToStringExclude + private volatile long lastUpdateTime = U.currentTimeMillis(); + + /** Metrics provider (transient). */ + @GridToStringExclude + private DiscoveryMetricsProvider metricsProvider; + + /** Visible flag (transient). */ + @GridToStringExclude + private boolean visible; + + /** Grid local node flag (transient). */ + private boolean loc; + + /** Version. */ + private IgniteProductVersion ver; + + /** Alive check (used by clients). */ + @GridToStringExclude + private transient int aliveCheck; + + /** Client router node ID. */ + @GridToStringExclude + private UUID clientRouterNodeId; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNode() { + // No-op. + } + + /** + * Constructor. + * + * @param id Node Id. + * @param addrs Addresses. + * @param hostNames Host names. + * @param discPort Port. + * @param metricsProvider Metrics provider. + * @param ver Version. + */ + public TcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort, + DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) { + assert id != null; + assert !F.isEmpty(addrs); + assert metricsProvider != null; + assert ver != null; + + this.id = id; + this.addrs = addrs; + this.hostNames = hostNames; + this.discPort = discPort; + this.metricsProvider = metricsProvider; + this.ver = ver; + + consistentId = U.consistentId(addrs, discPort); + + metrics = metricsProvider.getMetrics(); + sockAddrs = U.toSocketAddresses(this, discPort); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistentId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T attribute(String name) { + // Even though discovery SPI removes this attribute after authentication, keep this check for safety. + if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) + return null; + + return (T)attrs.get(name); + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + // Even though discovery SPI removes this attribute after authentication, keep this check for safety. + return F.view(attrs, new IgnitePredicate<String>() { + @Override public boolean apply(String s) { + return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); + } + }); + } + + /** + * Sets node attributes. + * + * @param attrs Node attributes. + */ + public void setAttributes(Map<String, Object> attrs) { + this.attrs = U.sealMap(attrs); + } + + /** + * Gets node attributes without filtering. + * + * @return Node attributes without filtering. + */ + public Map<String, Object> getAttributes() { + return attrs; + } + + /** {@inheritDoc} */ + @Override public ClusterNodeMetrics metrics() { + if (metricsProvider != null) + metrics = metricsProvider.getMetrics(); + + return metrics; + } + + /** + * Sets node metrics. + * + * @param metrics Node metrics. + */ + public void setMetrics(ClusterNodeMetrics metrics) { + assert metrics != null; + + this.metrics = metrics; + } + + /** + * @return Internal order. + */ + public long internalOrder() { + return intOrder; + } + + /** + * @param intOrder Internal order of the node. + */ + public void internalOrder(long intOrder) { + assert intOrder > 0; + + this.intOrder = intOrder; + } + + /** + * @return Order. + */ + @Override public long order() { + return order; + } + + /** + * @param order Order of the node. + */ + public void order(long order) { + assert order >= 0 : "Order is invalid: " + this; + + this.order = order; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return ver; + } + + /** + * @param ver Version. + */ + public void version(IgniteProductVersion ver) { + assert ver != null; + + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return loc; + } + + /** + * @param loc Grid local node flag. + */ + public void local(boolean loc) { + this.loc = loc; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON)); + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + return hostNames; + } + + /** + * @return Discovery port. + */ + public int discoveryPort() { + return discPort; + } + + /** + * @return Addresses that could be used by discovery. + */ + public Collection<InetSocketAddress> socketAddresses() { + return sockAddrs; + } + + /** + * Gets node last update time. + * + * @return Time of the last heartbeat. + */ + public long lastUpdateTime() { + return lastUpdateTime; + } + + /** + * Sets node last update. + * + * @param lastUpdateTime Time of last metrics update. + */ + public void lastUpdateTime(long lastUpdateTime) { + assert lastUpdateTime > 0; + + this.lastUpdateTime = lastUpdateTime; + } + + /** + * Gets visible flag. + * + * @return {@code true} if node is in visible state. + */ + public boolean visible() { + return visible; + } + + /** + * Sets visible flag. + * + * @param visible {@code true} if node is in visible state. + */ + public void visible(boolean visible) { + this.visible = visible; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return clientRouterNodeId != null; + } + + /** + * Decrements alive check value and returns new one. + * + * @return Alive check value. + */ + public int decrementAliveCheck() { + assert isClient(); + + return --aliveCheck; + } + + /** + * @param aliveCheck Alive check value. + */ + public void aliveCheck(int aliveCheck) { + assert isClient(); + + this.aliveCheck = aliveCheck; + } + + /** + * @return Client router node ID. + */ + public UUID clientRouterNodeId() { + return clientRouterNodeId; + } + + /** + * @param clientRouterNodeId Client router node ID. + */ + public void clientRouterNodeId(UUID clientRouterNodeId) { + this.clientRouterNodeId = clientRouterNodeId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@Nullable TcpDiscoveryNode node) { + if (node == null) + return 1; + + if (internalOrder() == node.internalOrder()) + assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']'; + + return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 : + id().compareTo(node.id()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, id); + U.writeMap(out, attrs); + U.writeCollection(out, addrs); + U.writeCollection(out, hostNames); + out.writeInt(discPort); + + byte[] mtr = null; + + if (metrics != null) { + mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE]; + + DiscoveryMetricsHelper.serialize(mtr, 0, metrics); + } + + U.writeByteArray(out, mtr); + + out.writeLong(order); + out.writeLong(intOrder); + out.writeObject(ver); + U.writeUuid(out, clientRouterNodeId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readUuid(in); + + attrs = U.sealMap(U.<String, Object>readMap(in)); + addrs = U.readCollection(in); + hostNames = U.readCollection(in); + discPort = in.readInt(); + + sockAddrs = U.toSocketAddresses(this, discPort); + + consistentId = U.consistentId(addrs, discPort); + + byte[] mtr = U.readByteArray(in); + + if (mtr != null) + metrics = DiscoveryMetricsHelper.deserialize(mtr, 0); + + order = in.readLong(); + intOrder = in.readLong(); + ver = (IgniteProductVersion)in.readObject(); + clientRouterNodeId = U.readUuid(in); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return F.eqNodes(this, o); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java new file mode 100644 index 0000000..b9093c1 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -0,0 +1,636 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.internal; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.locks.*; + +/** + * Convenient way to represent topology for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi} + */ +public class TcpDiscoveryNodesRing { + /** Visible nodes filter. */ + private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { + return node.visible(); + } + }; + + /** Client nodes filter. */ + private static final PN CLIENT_NODES = new PN() { + @Override public boolean apply(ClusterNode node) { + return node.isClient(); + } + }; + + /** Local node. */ + private TcpDiscoveryNode locNode; + + /** All nodes in topology. */ + @GridToStringInclude + private NavigableSet<TcpDiscoveryNode> nodes = new TreeSet<>(); + + /** All started nodes. */ + @GridToStringExclude + private Map<UUID, TcpDiscoveryNode> nodesMap = new HashMap<>(); + + /** Current topology version */ + private long topVer; + + /** */ + private long nodeOrder; + + /** Lock. */ + @GridToStringExclude + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** + * Sets local node. + * + * @param locNode Local node. + */ + public void localNode(TcpDiscoveryNode locNode) { + assert locNode != null; + + rwLock.writeLock().lock(); + + try { + this.locNode = locNode; + + clear(); + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Gets all nodes in the topology. + * + * @return Collection of all nodes. + */ + public Collection<TcpDiscoveryNode> allNodes() { + return nodes(); + } + + /** + * Gets visible nodes in the topology. + * + * @return Collection of visible nodes. + */ + public Collection<TcpDiscoveryNode> visibleNodes() { + return nodes(VISIBLE_NODES); + } + + /** + * Gets remote nodes. + * + * @return Collection of remote nodes in grid. + */ + public Collection<TcpDiscoveryNode> remoteNodes() { + return nodes(F.remoteNodes(locNode.id())); + } + + /** + * Gets visible remote nodes in the topology. + * + * @return Collection of visible remote nodes. + */ + public Collection<TcpDiscoveryNode> visibleRemoteNodes() { + return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id())); + } + + /** + * @return Client nodes. + */ + public Collection<TcpDiscoveryNode> clientNodes() { + return nodes(CLIENT_NODES); + } + + /** + * Checks whether the topology has remote nodes in. + * + * @return {@code true} if the topology has remote nodes in. + */ + public boolean hasRemoteNodes() { + rwLock.readLock().lock(); + + try { + return nodes.size() > 1; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Adds node to topology, also initializes node last update time with current + * system time. + * + * @param node Node to add. + * @return {@code true} if such node was added and did not present previously in the topology. + */ + public boolean add(TcpDiscoveryNode node) { + assert node != null; + assert node.internalOrder() > 0; + + rwLock.writeLock().lock(); + + try { + if (nodesMap.containsKey(node.id())) + return false; + + assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " + + "[ring=" + this + ", node=" + node + ']'; + + nodesMap.put(node.id(), node); + + nodes = new TreeSet<>(nodes); + + node.lastUpdateTime(U.currentTimeMillis()); + + nodes.add(node); + + nodeOrder = node.internalOrder(); + } + finally { + rwLock.writeLock().unlock(); + } + + return true; + } + + /** + * @return Max internal order. + */ + public long maxInternalOrder() { + rwLock.readLock().lock(); + + try { + TcpDiscoveryNode last = nodes.last(); + + return last != null ? last.internalOrder() : -1; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Restores topology from parameters values. + * <p> + * This method is called when new node receives topology from coordinator. + * In this case all nodes received are remote for local. + * <p> + * Also initializes nodes last update time with current system time. + * + * @param nodes List of remote nodes. + * @param topVer Topology version. + */ + public void restoreTopology(Iterable<TcpDiscoveryNode> nodes, long topVer) { + assert !F.isEmpty(nodes); + assert topVer > 0; + + rwLock.writeLock().lock(); + + try { + locNode.internalOrder(topVer); + + clear(); + + boolean firstAdd = true; + + for (TcpDiscoveryNode node : nodes) { + if (nodesMap.containsKey(node.id())) + continue; + + nodesMap.put(node.id(), node); + + if (firstAdd) { + this.nodes = new TreeSet<>(this.nodes); + + firstAdd = false; + } + + node.lastUpdateTime(U.currentTimeMillis()); + + this.nodes.add(node); + } + + nodeOrder = topVer; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Finds node by ID. + * + * @param nodeId Node id to find. + * @return Node with ID provided or {@code null} if not found. + */ + @Nullable public TcpDiscoveryNode node(UUID nodeId) { + assert nodeId != null; + + rwLock.readLock().lock(); + + try { + return nodesMap.get(nodeId); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Removes node from the topology. + * + * @param nodeId ID of the node to remove. + * @return {@code true} if node was removed. + */ + @Nullable public TcpDiscoveryNode removeNode(UUID nodeId) { + assert nodeId != null; + assert !locNode.id().equals(nodeId); + + rwLock.writeLock().lock(); + + try { + TcpDiscoveryNode rmv = nodesMap.remove(nodeId); + + if (rmv != null) { + nodes = new TreeSet<>(nodes); + + nodes.remove(rmv); + } + + return rmv; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Removes nodes from the topology. + * + * @param nodeIds IDs of the nodes to remove. + * @return Collection of removed nodes. + */ + public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) { + assert !F.isEmpty(nodeIds); + + rwLock.writeLock().lock(); + + try { + boolean firstRmv = true; + + Collection<TcpDiscoveryNode> res = null; + + for (UUID id : nodeIds) { + TcpDiscoveryNode rmv = nodesMap.remove(id); + + if (rmv != null) { + if (firstRmv) { + nodes = new TreeSet<>(nodes); + + res = new ArrayList<>(nodeIds.size()); + + firstRmv = false; + } + + nodes.remove(rmv); + + res.add(rmv); + } + } + + return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Removes all remote nodes, leaves only local node. + * <p> + * This should be called when SPI should be disconnected from topology and + * reconnected back after. + */ + public void clear() { + rwLock.writeLock().lock(); + + try { + nodes = new TreeSet<>(); + + if (locNode != null) + nodes.add(locNode); + + nodesMap = new HashMap<>(); + + if (locNode != null) + nodesMap.put(locNode.id(), locNode); + + nodeOrder = 0; + + topVer = 0; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Finds coordinator in the topology. + * + * @return Coordinator node that gives versions to topology (node with the smallest order). + */ + @Nullable public TcpDiscoveryNode coordinator() { + rwLock.readLock().lock(); + + try { + if (F.isEmpty(nodes)) + return null; + + return coordinator(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds coordinator in the topology filtering excluded nodes from the search. + * <p> + * This may be used when handling current coordinator leave or failure. + * + * @param excluded Nodes to exclude from the search (optional). + * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded. + */ + @Nullable public TcpDiscoveryNode coordinator(@Nullable Collection<TcpDiscoveryNode> excluded) { + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (F.isEmpty(filtered)) + return null; + + return Collections.min(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds next node in the topology. + * + * @return Next node. + */ + @Nullable public TcpDiscoveryNode nextNode() { + rwLock.readLock().lock(); + + try { + if (nodes.size() < 2) + return null; + + return nextNode(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds next node in the topology filtering excluded nodes from search. + * <p> + * This may be used when detecting and handling nodes failure. + * + * @param excluded Nodes to exclude from the search (optional). If provided, + * cannot contain local node. + * @return Next node or {@code null} if all nodes were filtered out or + * topology contains less than two nodes. + */ + @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) { + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (filtered.size() < 2) + return null; + + Iterator<TcpDiscoveryNode> iter = filtered.iterator(); + + while (iter.hasNext()) { + TcpDiscoveryNode node = iter.next(); + + if (locNode.equals(node)) + break; + } + + return iter.hasNext() ? iter.next() : F.first(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds previous node in the topology. + * + * @return Previous node. + */ + @Nullable public TcpDiscoveryNode previousNode() { + rwLock.readLock().lock(); + + try { + if (nodes.size() < 2) + return null; + + return previousNode(null); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Finds previous node in the topology filtering excluded nodes from search. + * + * @param excluded Nodes to exclude from the search (optional). If provided, + * cannot contain local node. + * @return Previous node or {@code null} if all nodes were filtered out or + * topology contains less than two nodes. + */ + @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) { + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + + rwLock.readLock().lock(); + + try { + Collection<TcpDiscoveryNode> filtered = serverNodes(excluded); + + if (filtered.size() < 2) + return null; + + Iterator<TcpDiscoveryNode> iter = filtered.iterator(); + + while (iter.hasNext()) { + TcpDiscoveryNode node = iter.next(); + + if (locNode.equals(node)) + break; + } + + return iter.hasNext() ? iter.next() : F.first(filtered); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Gets current topology version. + * + * @return Current topology version. + */ + public long topologyVersion() { + rwLock.readLock().lock(); + + try { + return topVer; + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Sets new topology version. + * + * @param topVer New topology version (should be greater than current, otherwise no-op). + * @return {@code True} if topology has been changed. + */ + public boolean topologyVersion(long topVer) { + rwLock.writeLock().lock(); + + try { + if (this.topVer < topVer) { + this.topVer = topVer; + + return true; + } + + U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']'); + + return false; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Increments topology version and gets new value. + * + * @return Topology version (incremented). + */ + public long incrementTopologyVersion() { + rwLock.writeLock().lock(); + + try { + return ++topVer; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Increments topology version and gets new value. + * + * @return Topology version (incremented). + */ + public long nextNodeOrder() { + rwLock.writeLock().lock(); + + try { + if (nodeOrder == 0) { + TcpDiscoveryNode last = nodes.last(); + + assert last != null; + + nodeOrder = last.internalOrder(); + } + + return ++nodeOrder; + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * @param p Filters. + * @return Unmodifiable collection of nodes. + */ + private Collection<TcpDiscoveryNode> nodes(IgnitePredicate<? super TcpDiscoveryNode>... p) { + rwLock.readLock().lock(); + + try { + List<TcpDiscoveryNode> list = U.arrayList(nodes, p); + + return Collections.unmodifiableCollection(list); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Gets server nodes from topology. + * + * @param excluded Nodes to exclude from the search (optional). + * @return Collection of server nodes. + */ + private Collection<TcpDiscoveryNode> serverNodes(@Nullable final Collection<TcpDiscoveryNode> excluded) { + final boolean excludedEmpty = F.isEmpty(excluded); + + return F.view(nodes, new P1<TcpDiscoveryNode>() { + @Override public boolean apply(TcpDiscoveryNode node) { + return !node.isClient() && (excludedEmpty || !excluded.contains(node)); + } + }); + } + + /** {@inheritDoc} */ + @Override public String toString() { + rwLock.readLock().lock(); + + try { + return S.toString(TcpDiscoveryNodesRing.class, this); + } + finally { + rwLock.readLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java new file mode 100644 index 0000000..7ed3e0a --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoverySpiState.java @@ -0,0 +1,45 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.internal; + +/** + * State of local node {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public enum TcpDiscoverySpiState { + /** */ + DISCONNECTED, + + /** */ + CONNECTING, + + /** */ + CONNECTED, + + /** */ + DISCONNECTING, + + /** */ + STOPPING, + + /** */ + LEFT, + + /** */ + DUPLICATE_ID, + + /** */ + AUTH_FAILED, + + /** */ + CHECK_FAILED, + + /** */ + LOOPBACK_PROBLEM +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java new file mode 100644 index 0000000..1155304 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -0,0 +1,639 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.internal; + +import org.apache.ignite.lang.*; +import org.gridgain.grid.spi.discovery.tcp.messages.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Statistics for {@link org.gridgain.grid.spi.discovery.tcp.TcpDiscoverySpi}. + */ +public class TcpDiscoveryStatistics { + /** Join started timestamp. */ + private long joinStartedTs; + + /** Join finished timestamp. */ + private long joinFinishedTs; + + /** Coordinator since timestamp. */ + private final AtomicLong crdSinceTs = new AtomicLong(); + + /** Joined nodes count. */ + private int joinedNodesCnt; + + /** Failed nodes count. */ + private int failedNodesCnt; + + /** Left nodes count. */ + private int leftNodesCnt; + + /** Ack timeouts count. */ + private int ackTimeoutsCnt; + + /** Socket timeouts count. */ + private int sockTimeoutsCnt; + + /** Received messages. */ + @GridToStringInclude + private final Map<String, Integer> rcvdMsgs = new HashMap<>(); + + /** Processed messages. */ + @GridToStringInclude + private final Map<String, Integer> procMsgs = new HashMap<>(); + + /** Average time taken to serialize messages. */ + @GridToStringInclude + private final Map<String, Long> avgMsgsSndTimes = new HashMap<>(); + + /** Average time taken to serialize messages. */ + @GridToStringInclude + private final Map<String, Long> maxMsgsSndTimes = new HashMap<>(); + + /** Sent messages. */ + @GridToStringInclude + private final Map<String, Integer> sentMsgs = new HashMap<>(); + + /** Messages receive timestamps. */ + private final Map<IgniteUuid, Long> msgsRcvTs = new GridBoundedLinkedHashMap<>(1024); + + /** Messages processing start timestamps. */ + private final Map<IgniteUuid, Long> msgsProcStartTs = new GridBoundedLinkedHashMap<>(1024); + + /** Ring messages sent timestamps. */ + private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); + + /** Average time messages is in queue. */ + private long avgMsgQueueTime; + + /** Max time messages is in queue. */ + private long maxMsgQueueTime; + + /** Total number of ring messages sent. */ + private int ringMsgsSent; + + /** Average time it takes for messages to pass the full ring. */ + private long avgRingMsgTime; + + /** Max time it takes for messages to pass the full ring. */ + private long maxRingMsgTime; + + /** Class name of ring message that required the biggest time for full ring traverse. */ + private String maxRingTimeMsgCls; + + /** Average message processing time. */ + private long avgMsgProcTime; + + /** Max message processing time. */ + private long maxMsgProcTime; + + /** Class name of the message that required the biggest time to process. */ + private String maxProcTimeMsgCls; + + /** Socket readers created count. */ + private int sockReadersCreated; + + /** Socket readers removed count. */ + private int sockReadersRmv; + + /** Average time it takes to initialize connection from another node. */ + private long avgSrvSockInitTime; + + /** Max time it takes to initialize connection from another node. */ + private long maxSrvSockInitTime; + + /** Number of outgoing connections established. */ + private int clientSockCreatedCnt; + + /** Average time it takes to connect to another node. */ + private long avgClientSockInitTime; + + /** Max time it takes to connect to another node. */ + private long maxClientSockInitTime; + + /** Pending messages registered count. */ + private int pendingMsgsRegistered; + + /** Pending messages discarded count. */ + private int pendingMsgsDiscarded; + + /** + * Increments joined nodes count. + */ + public synchronized void onNodeJoined() { + joinedNodesCnt++; + } + + /** + * Increments left nodes count. + */ + public synchronized void onNodeLeft() { + leftNodesCnt++; + } + + /** + * Increments failed nodes count. + */ + public synchronized void onNodeFailed() { + failedNodesCnt++; + } + + /** + * Increments ack timeouts count. + */ + public synchronized void onAckTimeout() { + ackTimeoutsCnt++; + } + + /** + * Increments socket timeouts count. + */ + public synchronized void onSocketTimeout() { + sockTimeoutsCnt++; + } + + /** + * Initializes coordinator since date (if needed). + */ + public void onBecomingCoordinator() { + crdSinceTs.compareAndSet(0, U.currentTimeMillis()); + } + + /** + * Initializes join started timestamp. + */ + public synchronized void onJoinStarted() { + joinStartedTs = U.currentTimeMillis(); + } + + /** + * Initializes join finished timestamp. + */ + public synchronized void onJoinFinished() { + joinFinishedTs = U.currentTimeMillis(); + } + + /** + * @return Join started timestamp. + */ + public synchronized long joinStarted() { + return joinStartedTs; + } + + /** + * @return Join finished timestamp. + */ + public synchronized long joinFinished() { + return joinFinishedTs; + } + + /** + * Collects necessary stats for message received by SPI. + * + * @param msg Received message. + */ + public synchronized void onMessageReceived(GridTcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Integer cnt = F.addIfAbsent(rcvdMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + rcvdMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + msgsRcvTs.put(msg.id(), U.currentTimeMillis()); + } + + /** + * Collects necessary stats for message processed by SPI. + * + * @param msg Processed message. + */ + public synchronized void onMessageProcessingStarted(GridTcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Integer cnt = F.addIfAbsent(procMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + procMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + Long rcvdTs = msgsRcvTs.remove(msg.id()); + + if (rcvdTs != null) { + long duration = U.currentTimeMillis() - rcvdTs; + + if (maxMsgQueueTime < duration) + maxMsgQueueTime = duration; + + avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages(); + } + + msgsProcStartTs.put(msg.id(), U.currentTimeMillis()); + } + + /** + * Collects necessary stats for message processed by SPI. + * + * @param msg Processed message. + */ + public synchronized void onMessageProcessingFinished(GridTcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Long startTs = msgsProcStartTs.get(msg.id()); + + if (startTs != null) { + long duration = U.currentTimeMillis() - startTs; + + avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages(); + + if (duration > maxMsgProcTime) { + maxMsgProcTime = duration; + + maxProcTimeMsgCls = msg.getClass().getSimpleName(); + } + + msgsProcStartTs.remove(msg.id()); + } + } + + /** + * Called by coordinator when ring message is sent. + * + * @param msg Sent message. + * @param time Time taken to serialize message. + */ + public synchronized void onMessageSent(GridTcpDiscoveryAbstractMessage msg, long time) { + assert msg != null; + assert time >= 0; + + if (crdSinceTs.get() > 0 && + (msg instanceof GridTcpDiscoveryNodeAddedMessage) || + (msg instanceof GridTcpDiscoveryNodeLeftMessage) || + (msg instanceof GridTcpDiscoveryNodeFailedMessage)) { + ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); + + ringMsgsSent++; + } + + Integer cnt = F.addIfAbsent(sentMsgs, msg.getClass().getSimpleName(), new Callable<Integer>() { + @Override public Integer call() { + return 0; + } + }); + + assert cnt != null; + + sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); + + Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + @Override public Long call() { + return 0L; + } + }); + + assert avgTime != null; + + avgTime = (avgTime * (cnt - 1) + time) / cnt; + + avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); + + Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + @Override public Long call() { + return 0L; + } + }); + + assert maxTime != null; + + if (time > maxTime) + maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); + } + + /** + * Called by coordinator when ring message makes full pass. + * + * @param msg Message. + */ + public synchronized void onRingMessageReceived(GridTcpDiscoveryAbstractMessage msg) { + assert msg != null; + + Long sentTs = ringMsgsSndTs.get(msg.id()); + + if (sentTs != null) { + long duration = U.currentTimeMillis() - sentTs; + + if (maxRingMsgTime < duration) { + maxRingMsgTime = duration; + + maxRingTimeMsgCls = msg.getClass().getSimpleName(); + } + + if (ringMsgsSent != 0) + avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent; + } + } + + /** + * Gets max time for ring message to make full pass. + * + * @return Max full pass time. + */ + public synchronized long maxRingMessageTime() { + return maxRingMsgTime; + } + + /** + * Gets class name of the message that took max time to make full pass. + * + * @return Message class name. + */ + public synchronized String maxRingDurationMessageClass() { + return maxRingTimeMsgCls; + } + + /** + * Gets class name of the message took max time to process. + * + * @return Message class name. + */ + public synchronized String maxProcessingTimeMessageClass() { + return maxProcTimeMsgCls; + } + + /** + * @param initTime Time socket was initialized in. + */ + public synchronized void onServerSocketInitialized(long initTime) { + assert initTime >= 0; + + if (maxSrvSockInitTime < initTime) + maxSrvSockInitTime = initTime; + + avgSrvSockInitTime = (avgSrvSockInitTime * (sockReadersCreated - 1) + initTime) / sockReadersCreated; + } + + /** + * @param initTime Time socket was initialized in. + */ + public synchronized void onClientSocketInitialized(long initTime) { + assert initTime >= 0; + + clientSockCreatedCnt++; + + if (maxClientSockInitTime < initTime) + maxClientSockInitTime = initTime; + + avgClientSockInitTime = (avgClientSockInitTime * (clientSockCreatedCnt - 1) + initTime) / clientSockCreatedCnt; + } + + /** + * Increments pending messages registered count. + */ + public synchronized void onPendingMessageRegistered() { + pendingMsgsRegistered++; + } + + /** + * Increments pending messages discarded count. + */ + public synchronized void onPendingMessageDiscarded() { + pendingMsgsDiscarded++; + } + + /** + * Increments socket readers created count. + */ + public synchronized void onSocketReaderCreated() { + sockReadersCreated++; + } + + /** + * Increments socket readers removed count. + */ + public synchronized void onSocketReaderRemoved() { + sockReadersRmv++; + } + + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public synchronized Map<String, Integer> processedMessages() { + return new HashMap<>(procMsgs); + } + + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public synchronized Map<String, Integer> receivedMessages() { + return new HashMap<>(rcvdMsgs); + } + + /** + * Gets max messages send time (grouped by type). + * + * @return Map containing messages types and max send times. + */ + public synchronized Map<String, Long> maxMessagesSendTimes() { + return new HashMap<>(maxMsgsSndTimes); + } + + /** + * Gets average messages send time (grouped by type). + * + * @return Map containing messages types and average send times. + */ + public synchronized Map<String, Long> avgMessagesSendTimes() { + return new HashMap<>(avgMsgsSndTimes); + } + + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + public synchronized int totalReceivedMessages() { + return F.sumInt(receivedMessages().values()); + } + + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + public synchronized int totalProcessedMessages() { + return F.sumInt(processedMessages().values()); + } + + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + public synchronized long maxMessageProcessingTime(){ + return maxMsgProcTime; + } + + /** + * Gets average message processing time. + * + * @return Average message processing time. + */ + public synchronized long avgMessageProcessingTime() { + return avgMsgProcTime; + } + + /** + * Gets pending messages registered count. + * + * @return Pending messages registered count. + */ + public synchronized long pendingMessagesRegistered() { + return pendingMsgsRegistered; + } + + /** + * Gets pending messages discarded count. + * + * @return Pending messages registered count. + */ + public synchronized long pendingMessagesDiscarded() { + return pendingMsgsDiscarded; + } + + /** + * Gets nodes joined count. + * + * @return Nodes joined count. + */ + public synchronized int joinedNodesCount() { + return joinedNodesCnt; + } + + /** + * Gets nodes left count. + * + * @return Nodes left count. + */ + public synchronized int leftNodesCount() { + return leftNodesCnt; + } + + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + public synchronized int failedNodesCount() { + return failedNodesCnt; + } + + /** + * @return Ack timeouts count. + */ + public synchronized int ackTimeoutsCount() { + return ackTimeoutsCnt; + } + + /** + * @return Socket timeouts count. + */ + public synchronized int socketTimeoutsCount() { + return sockTimeoutsCnt; + } + + /** + * Gets socket readers created count. + * + * @return Socket readers created count. + */ + public synchronized int socketReadersCreated() { + return sockReadersCreated; + } + + /** + * Gets socket readers removed count. + * + * @return Socket readers removed count. + */ + public synchronized int socketReadersRemoved() { + return sockReadersRmv; + } + + /** + * Gets time local node has been coordinator since. + * + * @return Coordinator since timestamp. + */ + public long coordinatorSinceTimestamp() { + return crdSinceTs.get(); + } + + /** + * Clears statistics. + */ + public synchronized void clear() { + ackTimeoutsCnt = 0; + avgClientSockInitTime = 0; + avgMsgProcTime = 0; + avgMsgQueueTime = 0; + avgMsgsSndTimes.clear(); + avgRingMsgTime = 0; + avgSrvSockInitTime = 0; + clientSockCreatedCnt = 0; + crdSinceTs.set(0); + failedNodesCnt = 0; + joinedNodesCnt = 0; + joinFinishedTs = 0; + joinStartedTs = 0; + leftNodesCnt = 0; + maxClientSockInitTime = 0; + maxMsgProcTime = 0; + maxMsgQueueTime = 0; + maxMsgsSndTimes.clear(); + maxProcTimeMsgCls = null; + maxRingMsgTime = 0; + maxRingTimeMsgCls = null; + maxSrvSockInitTime = 0; + pendingMsgsDiscarded = 0; + pendingMsgsRegistered = 0; + procMsgs.clear(); + rcvdMsgs.clear(); + ringMsgsSent = 0; + sentMsgs.clear(); + sockReadersCreated = 0; + sockReadersRmv = 0; + sockTimeoutsCnt = 0; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(TcpDiscoveryStatistics.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryDuplicateIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryDuplicateIdMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryDuplicateIdMessage.java index 5c3e8f0..7dba08a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryDuplicateIdMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryDuplicateIdMessage.java @@ -24,7 +24,7 @@ public class GridTcpDiscoveryDuplicateIdMessage extends GridTcpDiscoveryAbstract private static final long serialVersionUID = 0L; /** Node with duplicate ID. */ - private GridTcpDiscoveryNode node; + private TcpDiscoveryNode node; /** * Public default no-arg constructor for {@link Externalizable} interface. @@ -39,7 +39,7 @@ public class GridTcpDiscoveryDuplicateIdMessage extends GridTcpDiscoveryAbstract * @param creatorNodeId Creator node ID. * @param node Node with same ID. */ - public GridTcpDiscoveryDuplicateIdMessage(UUID creatorNodeId, GridTcpDiscoveryNode node) { + public GridTcpDiscoveryDuplicateIdMessage(UUID creatorNodeId, TcpDiscoveryNode node) { super(creatorNodeId); assert node != null; @@ -50,7 +50,7 @@ public class GridTcpDiscoveryDuplicateIdMessage extends GridTcpDiscoveryAbstract /** * @return Node with duplicate ID. */ - public GridTcpDiscoveryNode node() { + public TcpDiscoveryNode node() { return node; } @@ -65,7 +65,7 @@ public class GridTcpDiscoveryDuplicateIdMessage extends GridTcpDiscoveryAbstract @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - node = (GridTcpDiscoveryNode)in.readObject(); + node = (TcpDiscoveryNode)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryJoinRequestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryJoinRequestMessage.java index 597db81..ac3b1ea 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryJoinRequestMessage.java @@ -24,7 +24,7 @@ public class GridTcpDiscoveryJoinRequestMessage extends GridTcpDiscoveryAbstract private static final long serialVersionUID = 0L; /** New node that wants to join the topology. */ - private GridTcpDiscoveryNode node; + private TcpDiscoveryNode node; /** Discovery data. */ private List<Object> discoData; @@ -42,7 +42,7 @@ public class GridTcpDiscoveryJoinRequestMessage extends GridTcpDiscoveryAbstract * @param node New node that wants to join. * @param discoData Discovery data. */ - public GridTcpDiscoveryJoinRequestMessage(GridTcpDiscoveryNode node, List<Object> discoData) { + public GridTcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, List<Object> discoData) { super(node.id()); this.node = node; @@ -54,7 +54,7 @@ public class GridTcpDiscoveryJoinRequestMessage extends GridTcpDiscoveryAbstract * * @return Node that wants to join the topology. */ - public GridTcpDiscoveryNode node() { + public TcpDiscoveryNode node() { return node; } @@ -91,7 +91,7 @@ public class GridTcpDiscoveryJoinRequestMessage extends GridTcpDiscoveryAbstract @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - node = (GridTcpDiscoveryNode)in.readObject(); + node = (TcpDiscoveryNode)in.readObject(); discoData = U.readList(in); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryNodeAddedMessage.java index 5749392..72e4d15 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryNodeAddedMessage.java @@ -31,7 +31,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe private static final long serialVersionUID = 0L; /** Added node. */ - private GridTcpDiscoveryNode node; + private TcpDiscoveryNode node; /** Pending messages from previous node. */ private Collection<GridTcpDiscoveryAbstractMessage> msgs; @@ -41,7 +41,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe /** Current topology. Initialized by coordinator. */ @GridToStringInclude - private Collection<GridTcpDiscoveryNode> top; + private Collection<TcpDiscoveryNode> top; /** Topology snapshots history. */ private Map<Long, Collection<ClusterNode>> topHist; @@ -70,7 +70,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe * @param newNodeDiscoData New Node discovery data. * @param gridStartTime Start time of the first grid node. */ - public GridTcpDiscoveryNodeAddedMessage(UUID creatorNodeId, GridTcpDiscoveryNode node, + public GridTcpDiscoveryNodeAddedMessage(UUID creatorNodeId, TcpDiscoveryNode node, List<Object> newNodeDiscoData, long gridStartTime) { super(creatorNodeId); @@ -89,7 +89,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe * * @return New node. */ - public GridTcpDiscoveryNode node() { + public TcpDiscoveryNode node() { return node; } @@ -127,7 +127,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe * * @return Current topology. */ - @Nullable public Collection<GridTcpDiscoveryNode> topology() { + @Nullable public Collection<TcpDiscoveryNode> topology() { return top; } @@ -136,7 +136,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe * * @param top Current topology. */ - public void topology(@Nullable Collection<GridTcpDiscoveryNode> top) { + public void topology(@Nullable Collection<TcpDiscoveryNode> top) { this.top = top; } @@ -221,7 +221,7 @@ public class GridTcpDiscoveryNodeAddedMessage extends GridTcpDiscoveryAbstractMe @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - node = (GridTcpDiscoveryNode)in.readObject(); + node = (TcpDiscoveryNode)in.readObject(); msgs = U.readCollection(in); discardMsgId = U.readGridUuid(in); top = U.readCollection(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryStatusCheckMessage.java index 896ae80..4baebab 100644 --- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/GridTcpDiscoveryStatusCheckMessage.java @@ -34,7 +34,7 @@ public class GridTcpDiscoveryStatusCheckMessage extends GridTcpDiscoveryAbstract public static final int STATUS_RECON = 2; /** Creator node. */ - private GridTcpDiscoveryNode creatorNode; + private TcpDiscoveryNode creatorNode; /** Failed node id. */ private UUID failedNodeId; @@ -55,7 +55,7 @@ public class GridTcpDiscoveryStatusCheckMessage extends GridTcpDiscoveryAbstract * @param creatorNode Creator node. * @param failedNodeId Failed node id. */ - public GridTcpDiscoveryStatusCheckMessage(GridTcpDiscoveryNode creatorNode, UUID failedNodeId) { + public GridTcpDiscoveryStatusCheckMessage(TcpDiscoveryNode creatorNode, UUID failedNodeId) { super(creatorNode.id()); this.creatorNode = creatorNode; @@ -67,7 +67,7 @@ public class GridTcpDiscoveryStatusCheckMessage extends GridTcpDiscoveryAbstract * * @return Creator node. */ - public GridTcpDiscoveryNode creatorNode() { + public TcpDiscoveryNode creatorNode() { return creatorNode; } @@ -111,7 +111,7 @@ public class GridTcpDiscoveryStatusCheckMessage extends GridTcpDiscoveryAbstract @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - creatorNode = (GridTcpDiscoveryNode)in.readObject(); + creatorNode = (TcpDiscoveryNode)in.readObject(); failedNodeId = U.readUuid(in); status = in.readInt(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java index e7b62d4..7a58541 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java @@ -118,10 +118,10 @@ public abstract class GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest GridCacheAffinity<Object> aff = affinity(g); - List<GridTcpDiscoveryNode> top = new ArrayList<>(); + List<TcpDiscoveryNode> top = new ArrayList<>(); for (ClusterNode node : g.cluster().nodes()) - top.add((GridTcpDiscoveryNode) node); + top.add((TcpDiscoveryNode) node); Collections.sort(top); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/test/java/org/gridgain/grid/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/spi/GridTcpSpiForwardingSelfTest.java index 68bf3d9..01a57bf 100644 --- a/modules/core/src/test/java/org/gridgain/grid/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/spi/GridTcpSpiForwardingSelfTest.java @@ -98,7 +98,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { Map<String, Object> attrs = new HashMap<>(node.attributes()); attrs.remove(createSpiAttributeName(ATTR_PORT)); - ((GridTcpDiscoveryNode)node).setAttributes(attrs); + ((TcpDiscoveryNode)node).setAttributes(attrs); return super.createTcpClient(node); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/191aae27/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java index 7817627..fbf60b7 100644 --- a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java @@ -175,7 +175,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest { } }, 4, "grid-starter"); - Collection<GridTcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes(); + Collection<TcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -314,7 +314,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest { IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - GridTcpDiscoveryNode node = ((GridTcpDiscoveryNode)discoMap.get(g1.name()). + TcpDiscoveryNode node = ((TcpDiscoveryNode)discoMap.get(g1.name()). getNode(discoEvt.eventNode().id())); assert node != null && node.visible();