Repository: incubator-ignite Updated Branches: refs/heads/ignite-1229 [created] 068b4e321
ignite-1229: stop ping process when node left topology Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/068b4e32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/068b4e32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/068b4e32 Branch: refs/heads/ignite-1229 Commit: 068b4e321fe20fea14a81ecd771b6a0aedcb472d Parents: ae11e9b Author: Denis Magda <dma...@gridgain.com> Authored: Tue Aug 11 12:36:44 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Tue Aug 11 12:53:03 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 101 ++++++++++++------- .../tcp/internal/IgniteNodeLeftException.java | 40 ++++++++ 2 files changed, 103 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/068b4e32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 92c21ed..331b286 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -386,14 +386,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeId == getLocalNodeId()) return true; - TcpDiscoveryNode node = ring.node(nodeId); - - if (node == null || !node.visible()) + if (!nodeAlive(nodeId)) return false; + TcpDiscoveryNode node = ring.node(nodeId); + boolean res = pingNode(node); - if (!res && !node.isClient()) { + if (!res && !node.isClient() && nodeAlive(nodeId)) { LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId); msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id())); @@ -421,14 +421,14 @@ class ServerImpl extends TcpDiscoveryImpl { node = ring.node(node.clientRouterNodeId()); - if (node == null || !node.visible()) + if (!nodeAlive(node.id())) return false; } for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { // ID returned by the node should be the same as ID of the parameter for ping to succeed. - IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId); + IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(), clientNodeId); boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2()); @@ -437,6 +437,14 @@ class ServerImpl extends TcpDiscoveryImpl { return res; } + catch (IgniteNodeLeftException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e); + + return false; + } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']'); @@ -453,12 +461,13 @@ class ServerImpl extends TcpDiscoveryImpl { * Pings the node by its address to see if it's alive. * * @param addr Address of the node. + * @param nodeId Node ID to ping. In case when client node ID is not null this node ID is an ID of the router node. * @param clientNodeId Client node ID. * @return ID of the remote node and "client exists" flag if node alive. * @throws IgniteCheckedException If an error occurs. */ - private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId) - throws IgniteCheckedException { + private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID nodeId, + @Nullable UUID clientNodeId) throws IgniteCheckedException { assert addr != null; UUID locNodeId = getLocalNodeId(); @@ -537,6 +546,10 @@ class ServerImpl extends TcpDiscoveryImpl { return t; } catch (IOException | IgniteCheckedException e) { + if (nodeId != null && !nodeAlive(nodeId)) + throw new IgniteNodeLeftException("Failed to ping node (node already left or leaving" + + " the ring) [nodeId=" + nodeId + ", addr=" + addr +']', e); + if (errs == null) errs = new ArrayList<>(); @@ -615,6 +628,28 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks whether a node is alive or not. + * + * @param nodeId Node ID. + * @return {@code True} if node is in the ring and is not being removed from. + */ + private boolean nodeAlive(UUID nodeId) { + // Is node alive or about to be removed from the ring? + TcpDiscoveryNode node = ring.node(nodeId); + + boolean nodeAlive = node != null && node.visible(); + + if (nodeAlive) { + synchronized (mux) { + nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && + !F.transform(leavingNodes, F.node2id()).contains(nodeId); + } + } + + return nodeAlive; + } + + /** * Tries to join this node to topology. * * @throws IgniteSpiException If any error occurs. @@ -1387,15 +1422,17 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Leaving nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : leavingNodes) - b.append(" ").append(node.id()).append(U.nl()); + synchronized (mux) { + for (TcpDiscoveryNode node : leavingNodes) + b.append(" ").append(node.id()).append(U.nl()); - b.append(U.nl()); + b.append(U.nl()); - b.append("Failed nodes: ").append(U.nl()); + b.append("Failed nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : failedNodes) - b.append(" ").append(node.id()).append(U.nl()); + for (TcpDiscoveryNode node : failedNodes) + b.append(" ").append(node.id()).append(U.nl()); + } b.append(U.nl()); @@ -1520,7 +1557,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (res == null) { try { - res = pingNode(addr, null).get1() != null; + res = pingNode(addr, null, null).get1() != null; } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -3775,9 +3812,17 @@ class ServerImpl extends TcpDiscoveryImpl { else { int aliveCheck = clientNode.decrementAliveCheck(); - if (aliveCheck <= 0 && isLocalNodeCoordinator() && !failedNodes.contains(clientNode)) - processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, - clientNode.id(), clientNode.internalOrder())); + if (aliveCheck <= 0 && isLocalNodeCoordinator()) { + boolean failedNode; + + synchronized (mux) { + failedNode = failedNodes.contains(clientNode); + } + + if (!failedNode) + processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, + clientNode.id(), clientNode.internalOrder())); + } } } } @@ -4689,26 +4734,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * @param nodeId Node ID. - * @return {@code True} if node is in the ring and is not being removed from. - */ - private boolean nodeAlive(UUID nodeId) { - // Is node alive or about to be removed from the ring? - TcpDiscoveryNode node = ring.node(nodeId); - - boolean nodeAlive = node != null && node.visible(); - - if (nodeAlive) { - synchronized (mux) { - nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && - !F.transform(leavingNodes, F.node2id()).contains(nodeId); - } - } - - return nodeAlive; - } - - /** * @param msg Join request message. * @param clientMsgWrk Client message worker to start. * @return Whether connection was successful. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/068b4e32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java new file mode 100644 index 0000000..80bcc67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import org.apache.ignite.*; + +import org.jetbrains.annotations.*; + +/** + * Thrown when there is an attempt to talk to the node that has already left the ring. + */ +public class IgniteNodeLeftException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IgniteNodeLeftException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +}