Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-4 0eadae95a -> f4dac4ed5


IGNITE-714 - Fail-fast node failure detction


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef12ba60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef12ba60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef12ba60

Branch: refs/heads/ignite-sprint-4
Commit: ef12ba6007a1938abf64a97b2f020e2390b6a8e6
Parents: bad0161
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu Apr 9 21:18:29 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu Apr 9 21:18:29 2015 -0700

----------------------------------------------------------------------
 modules/core/pom.xml                            |   2 +-
 .../internal/managers/GridManagerAdapter.java   |   4 +
 .../discovery/GridDiscoveryManager.java         |  14 +++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   5 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   6 +
 .../communication/tcp/TcpCommunicationSpi.java  |  47 ++++++--
 .../ignite/spi/discovery/DiscoverySpi.java      |   7 ++
 .../discovery/tcp/TcpClientDiscoverySpi.java    |   5 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  12 ++
 .../ignite/internal/GridFailFastSelfTest.java   | 112 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |   5 +
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 12 files changed, 209 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 466ccb9..41444c8 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -9,7 +9,7 @@
   the License.  You may obtain a copy of the License at
 
        http://www.apache.org/licenses/LICENSE-2.0
-
+IGNITE-714 -
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index c478d28..90031c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -536,6 +536,10 @@ public abstract class GridManagerAdapter<T extends 
IgniteSpi> implements GridMan
                         return ctx.io().messageFactory();
                     }
 
+                    @Override public boolean tryFailNode(UUID nodeId) {
+                        return ctx.discovery().tryFailNode(nodeId);
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 04ff423..856c523 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1335,6 +1335,20 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @return Whether node is failed.
+     */
+    public boolean tryFailNode(UUID nodeId) {
+        if (!getSpi().pingNode(nodeId)) {
+            getSpi().failNode(nodeId);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index f198210..7802d17 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -723,5 +723,10 @@ public abstract class IgniteSpiAdapter implements 
IgniteSpi, IgniteSpiManagement
         @Override public boolean isStopping() {
             return stopping;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean tryFailNode(UUID nodeId) {
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 5eef37a..e203387 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -351,4 +351,10 @@ public interface IgniteSpiContext {
      * @return {@code True} if node started shutdown sequence.
      */
     public boolean isStopping();
+
+    /**
+     * @param nodeId Node ID.
+     * @return If node was failed.
+     */
+    public boolean tryFailNode(UUID nodeId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ff84e5b..053f121 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -258,20 +258,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         rmv.forceClose();
 
                         if (!isNodeStopping()) {
-                            GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
+                            boolean failed = getSpiContext().tryFailNode(id);
 
-                            if (recoveryData != null) {
-                                if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                    if 
(!recoveryData.messagesFutures().isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Session was closed but 
there are unacknowledged messages, " +
-                                                "will try to reconnect 
[rmtNode=" + recoveryData.node().id() + ']');
+                            if (!failed) {
+                                GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
 
-                                        
recoveryWorker.addReconnectRequest(recoveryData);
+                                if (recoveryData != null) {
+                                    if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
+                                        if 
(!recoveryData.messagesFutures().isEmpty()) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Session was closed 
but there are unacknowledged messages, " +
+                                                    "will try to reconnect 
[rmtNode=" + recoveryData.node().id() + ']');
+
+                                            
recoveryWorker.addReconnectRequest(recoveryData);
+                                        }
                                     }
+                                    else
+                                        recoveryData.onNodeLeft();
                                 }
-                                else
-                                    recoveryData.onNodeLeft();
                             }
                         }
                     }
@@ -2054,6 +2058,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Stops service threads to simulate node failure.
+     *
+     * FOR TEST PURPOSES ONLY!!!
+     */
+    void simulateNodeFailure() {
+        if (nioSrvr != null)
+            nioSrvr.stop();
+
+        U.interrupt(idleClientWorker);
+        U.interrupt(clientFlushWorker);
+        U.interrupt(sockTimeoutWorker);
+        U.interrupt(recoveryWorker);
+
+        U.join(idleClientWorker, log);
+        U.join(clientFlushWorker, log);
+        U.join(sockTimeoutWorker, log);
+        U.join(recoveryWorker, log);
+
+        for (GridCommunicationClient client : clients.values())
+            client.forceClose();
+    }
+
+    /**
      * @param node Node.
      * @return Recovery receive data for given node.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 42bad22..7560999 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -144,4 +144,11 @@ public interface DiscoverySpi extends IgniteSpi {
      * @param evt Event.
      */
     public void sendCustomEvent(Serializable evt);
+
+    /**
+     * Initiates failure of provided node.
+     *
+     * @param nodeId Node ID.
+     */
+    public void failNode(UUID nodeId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index bf69efb..8b3113f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -379,6 +379,11 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         throw new UnsupportedOperationException();
     }
 
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        // No-op.
+    }
+
     /**
      * @param recon Reconnect flag.
      * @return Whether joined successfully.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index bad8837..1c09711 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1249,6 +1249,18 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         msgWorker.addMessage(new 
TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt));
     }
 
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        ClusterNode node = ring.node(nodeId);
+
+        if (node != null) {
+            TcpDiscoveryNodeFailedMessage msg = new 
TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+                node.id(), node.order());
+
+            msgWorker.addMessage(msg);
+        }
+    }
+
     /**
      * Tries to join this node to topology.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java
new file mode 100644
index 0000000..bc4de65
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.messaging.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Fail fast test.
+ */
+public class GridFailFastSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+        disco.setHeartbeatFrequency(10000);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailFast() throws Exception {
+        startGridsMultiThreaded(5);
+
+        final CountDownLatch failLatch = new CountDownLatch(4);
+
+        for (int i = 0; i < 5; i++) {
+            ignite(i).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    info(evt.shortDisplay());
+
+                    failLatch.countDown();
+
+                    return true;
+                }
+            }, EventType.EVT_NODE_FAILED);
+        }
+
+        Ignite ignite1 = ignite(0);
+        Ignite ignite2 = ignite(1);
+
+        ignite1.message().localListen(null, new MessagingListenActor<Object>() 
{
+            @Override protected void receive(UUID nodeId, Object rcvMsg) 
throws Throwable {
+                respond(rcvMsg);
+            }
+        });
+
+        ignite2.message().localListen(null, new MessagingListenActor<Object>() 
{
+            @Override protected void receive(UUID nodeId, Object rcvMsg) 
throws Throwable {
+                respond(rcvMsg);
+            }
+        });
+
+        ignite1.message(ignite1.cluster().forRemotes()).send(null, "Message");
+
+        failNode(ignite1);
+
+        assert failLatch.await(500, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws Exception In case of error.
+     */
+    private void failNode(Ignite ignite) throws Exception {
+        DiscoverySpi disco = ignite.configuration().getDiscoverySpi();
+
+        U.invoke(disco.getClass(), disco, "simulateNodeFailure");
+
+        CommunicationSpi comm = ignite.configuration().getCommunicationSpi();
+
+        U.invoke(comm.getClass(), comm, "simulateNodeFailure");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index a4e011a..c0fe759 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -519,6 +519,11 @@ public class GridSpiTestContext implements 
IgniteSpiContext {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean tryFailNode(UUID nodeId) {
+        return false;
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 7ae237f..081de2f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -69,6 +69,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(GridSuppressedExceptionSelfTest.class);
         suite.addTestSuite(GridLifecycleAwareSelfTest.class);
         suite.addTestSuite(GridMessageListenSelfTest.class);
+        suite.addTestSuite(GridFailFastSelfTest.class);
 
         return suite;
     }

Reply via email to