Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1229 [created] 068b4e321


ignite-1229: stop ping process when node left topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/068b4e32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/068b4e32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/068b4e32

Branch: refs/heads/ignite-1229
Commit: 068b4e321fe20fea14a81ecd771b6a0aedcb472d
Parents: ae11e9b
Author: Denis Magda <dma...@gridgain.com>
Authored: Tue Aug 11 12:36:44 2015 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Tue Aug 11 12:53:03 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 101 ++++++++++++-------
 .../tcp/internal/IgniteNodeLeftException.java   |  40 ++++++++
 2 files changed, 103 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/068b4e32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92c21ed..331b286 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -386,14 +386,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         if (nodeId == getLocalNodeId())
             return true;
 
-        TcpDiscoveryNode node = ring.node(nodeId);
-
-        if (node == null || !node.visible())
+        if (!nodeAlive(nodeId))
             return false;
 
+        TcpDiscoveryNode node = ring.node(nodeId);
+
         boolean res = pingNode(node);
 
-        if (!res && !node.isClient()) {
+        if (!res && !node.isClient() && nodeAlive(nodeId)) {
             LT.warn(log, null, "Failed to ping node (status check will be 
initiated): " + nodeId);
 
             msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, 
node.id()));
@@ -421,14 +421,14 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             node = ring.node(node.clientRouterNodeId());
 
-            if (node == null || !node.visible())
+            if (!nodeAlive(node.id()))
                 return false;
         }
 
         for (InetSocketAddress addr : spi.getNodeAddresses(node, 
U.sameMacs(locNode, node))) {
             try {
                 // ID returned by the node should be the same as ID of the 
parameter for ping to succeed.
-                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(), 
clientNodeId);
 
                 boolean res = node.id().equals(t.get1()) && (clientNodeId == 
null || t.get2());
 
@@ -437,6 +437,14 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 return res;
             }
+            catch (IgniteNodeLeftException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to ping node [node=" + node + ", err=" + 
e.getMessage() + ']');
+
+                onException("Failed to ping node [node=" + node + ", err=" + 
e.getMessage() + ']', e);
+
+                return false;
+            }
             catch (IgniteCheckedException e) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to ping node [node=" + node + ", err=" + 
e.getMessage() + ']');
@@ -453,12 +461,13 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Pings the node by its address to see if it's alive.
      *
      * @param addr Address of the node.
+     * @param nodeId Node ID to ping. In case when client node ID is not null 
this node ID is an ID of the router node.
      * @param clientNodeId Client node ID.
      * @return ID of the remote node and "client exists" flag if node alive.
      * @throws IgniteCheckedException If an error occurs.
      */
-    private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, 
@Nullable UUID clientNodeId)
-        throws IgniteCheckedException {
+    private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, 
@Nullable UUID nodeId,
+        @Nullable UUID clientNodeId) throws IgniteCheckedException {
         assert addr != null;
 
         UUID locNodeId = getLocalNodeId();
@@ -537,6 +546,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return t;
                     }
                     catch (IOException | IgniteCheckedException e) {
+                        if (nodeId != null && !nodeAlive(nodeId))
+                            throw new IgniteNodeLeftException("Failed to ping 
node (node already left or leaving" +
+                                " the ring) [nodeId=" + nodeId + ", addr=" + 
addr +']', e);
+
                         if (errs == null)
                             errs = new ArrayList<>();
 
@@ -615,6 +628,28 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Checks whether a node is alive or not.
+     *
+     * @param nodeId Node ID.
+     * @return {@code True} if node is in the ring and is not being removed 
from.
+     */
+    private boolean nodeAlive(UUID nodeId) {
+        // Is node alive or about to be removed from the ring?
+        TcpDiscoveryNode node = ring.node(nodeId);
+
+        boolean nodeAlive = node != null && node.visible();
+
+        if (nodeAlive) {
+            synchronized (mux) {
+                nodeAlive = !F.transform(failedNodes, 
F.node2id()).contains(nodeId) &&
+                    !F.transform(leavingNodes, F.node2id()).contains(nodeId);
+            }
+        }
+
+        return nodeAlive;
+    }
+
+    /**
      * Tries to join this node to topology.
      *
      * @throws IgniteSpiException If any error occurs.
@@ -1387,15 +1422,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             b.append("Leaving nodes: ").append(U.nl());
 
-            for (TcpDiscoveryNode node : leavingNodes)
-                b.append("    ").append(node.id()).append(U.nl());
+            synchronized (mux) {
+                for (TcpDiscoveryNode node : leavingNodes)
+                    b.append("    ").append(node.id()).append(U.nl());
 
-            b.append(U.nl());
+                b.append(U.nl());
 
-            b.append("Failed nodes: ").append(U.nl());
+                b.append("Failed nodes: ").append(U.nl());
 
-            for (TcpDiscoveryNode node : failedNodes)
-                b.append("    ").append(node.id()).append(U.nl());
+                for (TcpDiscoveryNode node : failedNodes)
+                    b.append("    ").append(node.id()).append(U.nl());
+            }
 
             b.append(U.nl());
 
@@ -1520,7 +1557,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (res == null) {
                             try {
-                                res = pingNode(addr, null).get1() != null;
+                                res = pingNode(addr, null, null).get1() != 
null;
                             }
                             catch (IgniteCheckedException e) {
                                 if (log.isDebugEnabled())
@@ -3775,9 +3812,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                             else {
                                 int aliveCheck = 
clientNode.decrementAliveCheck();
 
-                                if (aliveCheck <= 0 && 
isLocalNodeCoordinator() && !failedNodes.contains(clientNode))
-                                    processNodeFailedMessage(new 
TcpDiscoveryNodeFailedMessage(locNodeId,
-                                        clientNode.id(), 
clientNode.internalOrder()));
+                                if (aliveCheck <= 0 && 
isLocalNodeCoordinator()) {
+                                    boolean failedNode;
+
+                                    synchronized (mux) {
+                                        failedNode = 
failedNodes.contains(clientNode);
+                                    }
+
+                                    if (!failedNode)
+                                        processNodeFailedMessage(new 
TcpDiscoveryNodeFailedMessage(locNodeId,
+                                            clientNode.id(), 
clientNode.internalOrder()));
+                                }
                             }
                         }
                     }
@@ -4689,26 +4734,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param nodeId Node ID.
-         * @return {@code True} if node is in the ring and is not being 
removed from.
-         */
-        private boolean nodeAlive(UUID nodeId) {
-            // Is node alive or about to be removed from the ring?
-            TcpDiscoveryNode node = ring.node(nodeId);
-
-            boolean nodeAlive = node != null && node.visible();
-
-            if (nodeAlive) {
-                synchronized (mux) {
-                    nodeAlive = !F.transform(failedNodes, 
F.node2id()).contains(nodeId) &&
-                        !F.transform(leavingNodes, 
F.node2id()).contains(nodeId);
-                }
-            }
-
-            return nodeAlive;
-        }
-
-        /**
          * @param msg Join request message.
          * @param clientMsgWrk Client message worker to start.
          * @return Whether connection was successful.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/068b4e32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java
new file mode 100644
index 0000000..80bcc67
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/IgniteNodeLeftException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import org.apache.ignite.*;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Thrown when there is an attempt to talk to the node that has already left 
the ring.
+ */
+public class IgniteNodeLeftException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new exception with given error message and optional nested 
exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public IgniteNodeLeftException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

Reply via email to