http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java new file mode 100644 index 0000000..4632eaf --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -0,0 +1,308 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.spi.discovery.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.gridgain.grid.spi.discovery.DiscoveryMetricsHelper.*; + +/** + * Heartbeat message. + * <p> + * It is sent by coordinator node across the ring once a configured period. + * Message makes two passes: + * <ol> + * <li>During first pass, all nodes add their metrics to the message and + * update local metrics with metrics currently present in the message.</li> + * <li>During second pass, all nodes update all metrics present in the message + * and remove their own metrics from the message.</li> + * </ol> + * When message reaches coordinator second time it is discarded (it finishes the + * second pass). + */ +@TcpDiscoveryRedirectToClient +public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Map to store nodes metrics. */ + @GridToStringExclude + private Map<UUID, MetricsSet> metrics; + + /** Client node IDs. */ + private Collection<UUID> clientNodeIds; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryHeartbeatMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId Creator node. + */ + public TcpDiscoveryHeartbeatMessage(UUID creatorNodeId) { + super(creatorNodeId); + + metrics = U.newHashMap(1); + clientNodeIds = new HashSet<>(); + } + + /** + * Sets metrics for particular node. + * + * @param nodeId Node ID. + * @param metrics Node metrics. + */ + public void setMetrics(UUID nodeId, ClusterNodeMetrics metrics) { + assert nodeId != null; + assert metrics != null; + assert !this.metrics.containsKey(nodeId); + + this.metrics.put(nodeId, new MetricsSet(metrics)); + } + + /** + * Sets metrics for a client node. + * + * @param nodeId Server node ID. + * @param clientNodeId Client node ID. + * @param metrics Node metrics. + */ + public void setClientMetrics(UUID nodeId, UUID clientNodeId, ClusterNodeMetrics metrics) { + assert nodeId != null; + assert clientNodeId != null; + assert metrics != null; + assert this.metrics.containsKey(nodeId); + + this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics); + } + + /** + * Removes metrics for particular node from the message. + * + * @param nodeId Node ID. + */ + public void removeMetrics(UUID nodeId) { + assert nodeId != null; + + metrics.remove(nodeId); + } + + /** + * Gets metrics map. + * + * @return Metrics map. + */ + public Map<UUID, MetricsSet> metrics() { + return metrics; + } + + /** + * @return {@code True} if this message contains metrics. + */ + public boolean hasMetrics() { + return !metrics.isEmpty(); + } + + /** + * @return {@code True} if this message contains metrics. + */ + public boolean hasMetrics(UUID nodeId) { + assert nodeId != null; + + return metrics.get(nodeId) != null; + } + + /** + * Gets client node IDs for particular node. + * + * @return Client node IDs. + */ + public Collection<UUID> clientNodeIds() { + return clientNodeIds; + } + + /** + * Adds client node ID. + * + * @param clientNodeId Client node ID. + */ + public void addClientNodeId(UUID clientNodeId) { + clientNodeIds.add(clientNodeId); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeInt(metrics.size()); + + if (!metrics.isEmpty()) { + for (Map.Entry<UUID, MetricsSet> e : metrics.entrySet()) { + U.writeUuid(out, e.getKey()); + out.writeObject(e.getValue()); + } + } + + U.writeCollection(out, clientNodeIds); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + int metricsSize = in.readInt(); + + metrics = U.newHashMap(metricsSize); + + for (int i = 0; i < metricsSize; i++) + metrics.put(U.readUuid(in), (MetricsSet)in.readObject()); + + clientNodeIds = U.readCollection(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString()); + } + + /** + * @param metrics Metrics. + * @return Serialized metrics. + */ + private static byte[] serializeMetrics(ClusterNodeMetrics metrics) { + assert metrics != null; + + byte[] buf = new byte[DiscoveryMetricsHelper.METRICS_SIZE]; + + serialize(buf, 0, metrics); + + return buf; + } + + /** + * @param nodeId Node ID. + * @param metrics Metrics. + * @return Serialized metrics. + */ + private static byte[] serializeMetrics(UUID nodeId, ClusterNodeMetrics metrics) { + assert nodeId != null; + assert metrics != null; + + byte[] buf = new byte[16 + DiscoveryMetricsHelper.METRICS_SIZE]; + + U.longToBytes(nodeId.getMostSignificantBits(), buf, 0); + U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8); + + serialize(buf, 16, metrics); + + return buf; + } + + /** + */ + @SuppressWarnings("PublicInnerClass") + public static class MetricsSet implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Metrics. */ + private byte[] metrics; + + /** Client metrics. */ + private Collection<byte[]> clientMetrics; + + /** + */ + public MetricsSet() { + // No-op. + } + + /** + * @param metrics Metrics. + */ + public MetricsSet(ClusterNodeMetrics metrics) { + assert metrics != null; + + this.metrics = serializeMetrics(metrics); + } + + /** + * @return Deserialized metrics. + */ + public ClusterNodeMetrics metrics() { + return deserialize(metrics, 0); + } + + /** + * @return Client metrics. + */ + public Collection<T2<UUID, ClusterNodeMetrics>> clientMetrics() { + return F.viewReadOnly(clientMetrics, new C1<byte[], T2<UUID, ClusterNodeMetrics>>() { + @Override public T2<UUID, ClusterNodeMetrics> apply(byte[] bytes) { + UUID nodeId = new UUID(U.bytesToLong(bytes, 0), U.bytesToLong(bytes, 8)); + + return new T2<>(nodeId, deserialize(bytes, 16)); + } + }); + } + + /** + * @param nodeId Client node ID. + * @param metrics Client metrics. + */ + private void addClientMetrics(UUID nodeId, ClusterNodeMetrics metrics) { + assert nodeId != null; + assert metrics != null; + + if (clientMetrics == null) + clientMetrics = new ArrayList<>(); + + clientMetrics.add(serializeMetrics(nodeId, metrics)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, metrics); + + out.writeInt(clientMetrics != null ? clientMetrics.size() : -1); + + if (clientMetrics != null) { + for (byte[] arr : clientMetrics) + U.writeByteArray(out, arr); + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + metrics = U.readByteArray(in); + + int clientMetricsSize = in.readInt(); + + if (clientMetricsSize >= 0) { + clientMetrics = new ArrayList<>(clientMetricsSize); + + for (int i = 0; i < clientMetricsSize; i++) + clientMetrics.add(U.readByteArray(in)); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java new file mode 100644 index 0000000..2546aeb --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -0,0 +1,102 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.spi.discovery.tcp.internal.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Initial message sent by a node that wants to enter topology. + * Sent to random node during SPI start. Then forwarded directly to coordinator. + */ +public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** New node that wants to join the topology. */ + private TcpDiscoveryNode node; + + /** Discovery data. */ + private List<Object> discoData; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryJoinRequestMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param node New node that wants to join. + * @param discoData Discovery data. + */ + public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, List<Object> discoData) { + super(node.id()); + + this.node = node; + this.discoData = discoData; + } + + /** + * Gets new node that wants to join the topology. + * + * @return Node that wants to join the topology. + */ + public TcpDiscoveryNode node() { + return node; + } + + /** + * @return Discovery data. + */ + public List<Object> discoveryData() { + return discoData; + } + + /** + * @return {@code true} flag. + */ + public boolean responded() { + return getFlag(RESPONDED_FLAG_POS); + } + + /** + * @param responded Responded flag. + */ + public void responded(boolean responded) { + setFlag(RESPONDED_FLAG_POS, responded); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(node); + U.writeCollection(out, discoData); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + node = (TcpDiscoveryNode)in.readObject(); + discoData = U.readList(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java new file mode 100644 index 0000000..fce327b --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java @@ -0,0 +1,87 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Message telling joining node that it has loopback problem (misconfiguration). + * This means that remote node is configured to use loopback address, but joining node is not, or vise versa. + */ +public class TcpDiscoveryLoopbackProblemMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Remote node addresses. */ + private Collection<String> addrs; + + /** Remote node host names. */ + private Collection<String> hostNames; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryLoopbackProblemMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId Creator node ID. + * @param addrs Remote node addresses. + * @param hostNames Remote node host names. + */ + public TcpDiscoveryLoopbackProblemMessage(UUID creatorNodeId, Collection<String> addrs, + Collection<String> hostNames) { + super(creatorNodeId); + + this.addrs = addrs; + this.hostNames = hostNames; + } + + /** + * @return Remote node addresses. + */ + public Collection<String> addresses() { + return addrs; + } + + /** + * @return Remote node host names. + */ + public Collection<String> hostNames() { + return hostNames; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeCollection(out, addrs); + U.writeCollection(out, hostNames); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + addrs = U.readCollection(in); + hostNames = U.readCollection(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryLoopbackProblemMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java new file mode 100644 index 0000000..ddb4ae7 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -0,0 +1,75 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Sent by coordinator across the ring to finish node add process. + */ +@TcpDiscoveryEnsureDelivery +@TcpDiscoveryRedirectToClient +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Added node ID. */ + private UUID nodeId; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNodeAddFinishedMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId ID of the creator node (coordinator). + * @param nodeId Added node ID. + */ + public TcpDiscoveryNodeAddFinishedMessage(UUID creatorNodeId, UUID nodeId) { + super(creatorNodeId); + + this.nodeId = nodeId; + } + + /** + * Gets ID of the node added. + * + * @return ID of the node added. + */ + public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeUuid(out, nodeId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + nodeId = U.readUuid(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java new file mode 100644 index 0000000..0e9317f --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -0,0 +1,246 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.spi.discovery.tcp.internal.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Message telling nodes that new node should be added to topology. + * When newly added node receives the message it connects to its next and finishes + * join process. + */ +@TcpDiscoveryEnsureDelivery +@TcpDiscoveryRedirectToClient +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Added node. */ + private TcpDiscoveryNode node; + + /** Pending messages from previous node. */ + private Collection<TcpDiscoveryAbstractMessage> msgs; + + /** Discarded message ID. */ + private IgniteUuid discardMsgId; + + /** Current topology. Initialized by coordinator. */ + @GridToStringInclude + private Collection<TcpDiscoveryNode> top; + + /** Topology snapshots history. */ + private Map<Long, Collection<ClusterNode>> topHist; + + /** Discovery data from new node. */ + private List<Object> newNodeDiscoData; + + /** Discovery data from old nodes. */ + private Collection<List<Object>> oldNodesDiscoData; + + /** Start time of the first grid node. */ + private long gridStartTime; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId Creator node ID. + * @param node Node to add to topology. + * @param newNodeDiscoData New Node discovery data. + * @param gridStartTime Start time of the first grid node. + */ + public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, TcpDiscoveryNode node, + List<Object> newNodeDiscoData, long gridStartTime) { + super(creatorNodeId); + + assert node != null; + assert gridStartTime > 0; + + this.node = node; + this.newNodeDiscoData = newNodeDiscoData; + this.gridStartTime = gridStartTime; + + oldNodesDiscoData = new LinkedList<>(); + } + + /** + * Gets newly added node. + * + * @return New node. + */ + public TcpDiscoveryNode node() { + return node; + } + + /** + * Gets pending messages sent to new node by its previous. + * + * @return Pending messages from previous node. + */ + @Nullable public Collection<TcpDiscoveryAbstractMessage> messages() { + return msgs; + } + + /** + * Gets discarded message ID. + * + * @return Discarded message ID. + */ + @Nullable public IgniteUuid discardedMessageId() { + return discardMsgId; + } + + /** + * Sets pending messages to send to new node. + * + * @param msgs Pending messages to send to new node. + * @param discardMsgId Discarded message ID. + */ + public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) { + this.msgs = msgs; + this.discardMsgId = discardMsgId; + } + + /** + * Gets topology. + * + * @return Current topology. + */ + @Nullable public Collection<TcpDiscoveryNode> topology() { + return top; + } + + /** + * Sets topology. + * + * @param top Current topology. + */ + public void topology(@Nullable Collection<TcpDiscoveryNode> top) { + this.top = top; + } + + /** + * Gets topology snapshots history. + * + * @return Map with topology snapshots history. + */ + @Nullable public Map<Long, Collection<ClusterNode>> topologyHistory() { + return topHist; + } + + /** + * Sets topology snapshots history. + * + * @param topHist Map with topology snapshots history. + */ + public void topologyHistory(@Nullable Map<Long, Collection<ClusterNode>> topHist) { + this.topHist = topHist; + } + + /** + * @return Discovery data from new node. + */ + public List<Object> newNodeDiscoveryData() { + return newNodeDiscoData; + } + + /** + * @return Discovery data from old nodes. + */ + public Collection<List<Object>> oldNodesDiscoveryData() { + return oldNodesDiscoData; + } + + /** + * @param discoData Discovery data to add. + */ + public void addDiscoveryData(List<Object> discoData) { + // Old nodes disco data may be null if message + // makes more than 1 pass due to stopping of the nodes in topology. + if (oldNodesDiscoData != null) + oldNodesDiscoData.add(discoData); + } + + /** + * Clears discovery data to minimize message size. + */ + public void clearDiscoveryData() { + newNodeDiscoData = null; + oldNodesDiscoData = null; + } + + /** + * @return First grid node start time. + */ + public long gridStartTime() { + return gridStartTime; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(node); + U.writeCollection(out, msgs); + U.writeGridUuid(out, discardMsgId); + U.writeCollection(out, top); + U.writeMap(out, topHist); + out.writeLong(gridStartTime); + U.writeCollection(out, newNodeDiscoData); + + out.writeInt(oldNodesDiscoData != null ? oldNodesDiscoData.size() : -1); + + if (oldNodesDiscoData != null) { + for (List<Object> list : oldNodesDiscoData) + U.writeCollection(out, list); + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + node = (TcpDiscoveryNode)in.readObject(); + msgs = U.readCollection(in); + discardMsgId = U.readGridUuid(in); + top = U.readCollection(in); + topHist = U.readTreeMap(in); + gridStartTime = in.readLong(); + newNodeDiscoData = U.readList(in); + + int oldNodesDiscoDataSize = in.readInt(); + + if (oldNodesDiscoDataSize >= 0) { + oldNodesDiscoData = new ArrayList<>(oldNodesDiscoDataSize); + + for (int i = 0; i < oldNodesDiscoDataSize; i++) + oldNodesDiscoData.add(U.readList(in)); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java new file mode 100644 index 0000000..8a40122 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java @@ -0,0 +1,93 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Sent by node that has detected node failure to coordinator across the ring, + * then sent by coordinator across the ring. + */ +@TcpDiscoveryEnsureDelivery +@TcpDiscoveryRedirectToClient +public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** ID of the failed node. */ + private UUID failedNodeId; + + /** Internal order of the failed node. */ + private long order; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNodeFailedMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId ID of the node that detects nodes failure. + * @param failedNodeId ID of the failed nodes. + * @param order Order of the failed node. + */ + public TcpDiscoveryNodeFailedMessage(UUID creatorNodeId, UUID failedNodeId, long order) { + super(creatorNodeId); + + assert failedNodeId != null; + assert order > 0; + + this.failedNodeId = failedNodeId; + this.order = order; + } + + /** + * Gets ID of the failed node. + * + * @return ID of the failed node. + */ + public UUID failedNodeId() { + return failedNodeId; + } + + /** + * @return Internal order of the failed node. + */ + public long order() { + return order; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeUuid(out, failedNodeId); + out.writeLong(order); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + failedNodeId = U.readUuid(in); + order = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeFailedMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java new file mode 100644 index 0000000..2e49505 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -0,0 +1,47 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Sent by node that is stopping to coordinator across the ring, + * then sent by coordinator across the ring. + */ +@TcpDiscoveryEnsureDelivery +@TcpDiscoveryRedirectToClient +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNodeId ID of the node that is about to leave the topology. + */ + public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { + super(creatorNodeId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java new file mode 100644 index 0000000..e2e5f51 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java @@ -0,0 +1,65 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Ping request. + */ +public class TcpDiscoveryPingRequest extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Pinged client node ID. */ + private UUID clientNodeId; + + /** + * For {@link Externalizable}. + */ + public TcpDiscoveryPingRequest() { + // No-op. + } + + /** + * @param creatorNodeId Creator node ID. + * @param clientNodeId Pinged client node ID. + */ + public TcpDiscoveryPingRequest(UUID creatorNodeId, @Nullable UUID clientNodeId) { + super(creatorNodeId); + + this.clientNodeId = clientNodeId; + } + + /** + * @return Pinged client node ID. + */ + @Nullable public UUID clientNodeId() { + return clientNodeId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeUuid(out, clientNodeId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + clientNodeId = U.readUuid(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java new file mode 100644 index 0000000..18a5be1 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java @@ -0,0 +1,66 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import java.io.*; +import java.util.*; + +/** + * Ping response. + */ +public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Whether pinged client exists. */ + private boolean clientExists; + + /** + * For {@link Externalizable}. + */ + public TcpDiscoveryPingResponse() { + // No-op. + } + + /** + * @param creatorNodeId Creator node ID. + */ + public TcpDiscoveryPingResponse(UUID creatorNodeId) { + super(creatorNodeId); + } + + /** + * @param clientExists Whether pinged client exists. + */ + public void clientExists(boolean clientExists) { + this.clientExists = clientExists; + } + + /** + * @return Whether pinged client exists. + */ + public boolean clientExists() { + return clientExists; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeBoolean(clientExists); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + clientExists = in.readBoolean(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java new file mode 100644 index 0000000..972067b --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java @@ -0,0 +1,23 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import java.lang.annotation.*; + +/** + * Message classes with this annotation attached will be + * redirected to client nodes when going through ring. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface TcpDiscoveryRedirectToClient { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java new file mode 100644 index 0000000..3481488 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -0,0 +1,123 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.discovery.tcp.messages; + +import org.gridgain.grid.spi.discovery.tcp.internal.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Message sent by node to its next to ensure that next node and + * connection to it are alive. Receiving node should send it across the ring, + * until message does not reach coordinator. Coordinator responds directly to node. + * <p> + * If a failed node id is specified then the message is sent across the ring up to the sender node + * to ensure that the failed node is actually failed. + */ +public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Status OK. */ + public static final int STATUS_OK = 1; + + /** Status RECONNECT. */ + public static final int STATUS_RECON = 2; + + /** Creator node. */ + private TcpDiscoveryNode creatorNode; + + /** Failed node id. */ + private UUID failedNodeId; + + /** Creator node status (initialized by coordinator). */ + private int status; + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryStatusCheckMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param creatorNode Creator node. + * @param failedNodeId Failed node id. + */ + public TcpDiscoveryStatusCheckMessage(TcpDiscoveryNode creatorNode, UUID failedNodeId) { + super(creatorNode.id()); + + this.creatorNode = creatorNode; + this.failedNodeId = failedNodeId; + } + + /** + * Gets creator node. + * + * @return Creator node. + */ + public TcpDiscoveryNode creatorNode() { + return creatorNode; + } + + /** + * Gets failed node id. + * + * @return Failed node id. + */ + public UUID failedNodeId() { + return failedNodeId; + } + + /** + * Gets creator status. + * + * @return Creator node status. + */ + public int status() { + return status; + } + + /** + * Sets creator node status (should be set by coordinator). + * + * @param status Creator node status. + */ + public void status(int status) { + this.status = status; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(creatorNode); + U.writeUuid(out, failedNodeId); + out.writeInt(status); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + creatorNode = (TcpDiscoveryNode)in.readObject(); + failedNodeId = U.readUuid(in); + status = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java index bd51479..cfb7b92 100644 --- a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java @@ -958,7 +958,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override void onBeforeMessageSentAcrossRing(Serializable msg) { - if (msg instanceof GridTcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) if (++i == 2) { simulateNodeFailure(); @@ -973,7 +973,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest { private static class FailBeforeNodeLeftSentSpi extends TcpDiscoverySpi { /** {@inheritDoc} */ @Override void onBeforeMessageSentAcrossRing(Serializable msg) { - if (msg instanceof GridTcpDiscoveryNodeLeftMessage) { + if (msg instanceof TcpDiscoveryNodeLeftMessage) { simulateNodeFailure(); throw new RuntimeException("Avoid message sending: " + msg.getClass());