http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
new file mode 100644
index 0000000..fbea187
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ *
+ */
+public class TcpDiscoverySpiFailureTimeoutSelfTest extends 
AbstractDiscoverySelfTest {
+    /** */
+    private static final int SPI_COUNT = 6;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder =  new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected int getSpiCount() {
+        return SPI_COUNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected DiscoverySpi getSpi(int idx) {
+        TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
+
+        spi.setMetricsProvider(createMetricsProvider());
+        spi.setIpFinder(ipFinder);
+
+        switch (idx) {
+            case 0:
+            case 1:
+                // Ignore
+                break;
+            case 2:
+                spi.setAckTimeout(3000);
+                break;
+            case 3:
+                spi.setSocketTimeout(4000);
+                break;
+            case 4:
+                spi.setReconnectCount(4);
+                break;
+            case 5:
+                spi.setMaxAckTimeout(10000);
+                break;
+            default:
+                assert false;
+        }
+
+        return spi;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionTimeoutEnabled() throws Exception {
+        assertTrue(firstSpi().failureDetectionTimeoutEnabled());
+        assertTrue(secondSpi().failureDetectionTimeoutEnabled());
+
+        
assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+                firstSpi().failureDetectionTimeout());
+        
assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+                secondSpi().failureDetectionTimeout());
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionTimeoutDisabled() throws Exception {
+        for (int i = 2; i < spis.size(); i++) {
+            
assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled());
+            assertEquals(0, 
((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout());
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketOpen() throws Exception {
+        try {
+            ClusterNode node = secondSpi().getLocalNode();
+
+            firstSpi().openSocketTimeout = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: 
openSocketTimeout"));
+
+            firstSpi().openSocketTimeout = false;
+            firstSpi().openSocketTimeoutWait = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: 
openSocketTimeoutWait"));
+        }
+        finally {
+            firstSpi().resetState();
+        }
+    }
+
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketWrite() throws Exception {
+        try {
+            ClusterNode node = secondSpi().getLocalNode();
+
+            firstSpi().writeToSocketTimeoutWait = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+
+            firstSpi().writeToSocketTimeoutWait = false;
+
+            assertTrue(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+        }
+        finally {
+            firstSpi().resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessage() throws Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            TcpDiscoveryNode nextNode = 
((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
+
+            nextSpi = null;
+
+            for (int i = 1; i < spis.size(); i++)
+                if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
+
+            Thread.sleep(firstSpi().failureDetectionTimeout());
+
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent >= 3 && sent < 7 : "messages sent: " + sent;
+            assert received >= 3 && received < 7 : "messages received: " + 
received;
+        }
+        finally {
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessageBackwardCompatibility() throws 
Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+        TcpDiscoveryNode nextNode = null;
+
+        IgniteProductVersion nextNodeVer = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
+
+            nextSpi = null;
+
+            for (int i = 1; i < spis.size(); i++)
+                if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            nextNodeVer = nextNode.version();
+
+            // Overriding the version of the next node. Connection check 
message must not been sent to it.
+            nextNode.version(new 
IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), 
TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
+                0l, null));
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
+
+            Thread.sleep(firstSpi().failureDetectionTimeout() / 2);
+
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent == 0 : "messages sent: " + sent;
+            assert received == 0 : "messages received: " + received;
+        }
+        finally {
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+
+            if (nextNode != null && nextNodeVer != null)
+                nextNode.version(nextNodeVer);
+        }
+    }
+
+    /**
+     * Returns the first spi with failure detection timeout enabled.
+     *
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi firstSpi() {
+        return (TestTcpDiscoverySpi)spis.get(0);
+    }
+
+
+    /**
+     * Returns the second spi with failure detection timeout enabled.
+     *
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi secondSpi() {
+        return (TestTcpDiscoverySpi)spis.get(1);
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private volatile boolean openSocketTimeout;
+
+        /** */
+        private volatile boolean openSocketTimeoutWait;
+
+        /** */
+        private volatile boolean writeToSocketTimeoutWait;
+
+        /** */
+        private volatile boolean countConnCheckMsg;
+
+        /** */
+        private volatile int connCheckStatusMsgCntSent;
+
+        /** */
+        private volatile int connCheckStatusMsgCntReceived;
+
+        /** */
+        private volatile boolean validTimeout = true;
+
+        /** */
+        private volatile IgniteSpiOperationTimeoutException err;
+
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr, 
IgniteSpiOperationTimeoutHelper timeoutHelper)
+            throws IOException, IgniteSpiOperationTimeoutException {
+
+            if (openSocketTimeout) {
+                err = new IgniteSpiOperationTimeoutException("Timeout: 
openSocketTimeout");
+                throw err;
+            }
+            else if (openSocketTimeoutWait) {
+                long timeout = timeoutHelper.nextTimeoutChunk(0);
+
+                try {
+                    Thread.sleep(timeout + 1000);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+
+                try {
+                    timeoutHelper.nextTimeoutChunk(0);
+                }
+                catch (IgniteSpiOperationTimeoutException e) {
+                    throw (err = new 
IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
+                }
+            }
+
+            Socket sock = super.openSocket(sockAddr, timeoutHelper);
+
+            try {
+                Thread.sleep(1500);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+
+            return sock;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg, long timeout)
+            throws IOException, IgniteCheckedException {
+            if (!(msg instanceof TcpDiscoveryPingRequest)) {
+                super.writeToSocket(sock, msg, timeout);
+                return;
+            }
+
+            if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) 
{
+                validTimeout = false;
+
+                throw new IgniteCheckedException("Invalid timeout: " + 
timeout);
+            }
+
+            if (writeToSocketTimeoutWait) {
+                try {
+                    Thread.sleep(timeout);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+            else
+                super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+            if (countConnCheckMsg && msg instanceof 
TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntSent++;
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+
+        /** {@inheritDoc} */
+        protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket 
sock, int res, long timeout)
+            throws IOException {
+            if (countConnCheckMsg && msg instanceof 
TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntReceived++;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /**
+         *
+         */
+        private void resetState() {
+            openSocketTimeout = false;
+            openSocketTimeoutWait = false;
+            writeToSocketTimeoutWait = false;
+            err = null;
+            validTimeout = true;
+            connCheckStatusMsgCntSent = 0;
+            connCheckStatusMsgCntReceived = 0;
+            countConnCheckMsg = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 2e35c73..d77c432 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -44,7 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends 
TestSuite {
 
         suite.addTest(new TestSuite(TcpDiscoverySelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class));
-        suite.addTest(new 
TestSuite(TcpDiscoverySpiFailureThresholdSelfTest.class));
+        suite.addTest(new 
TestSuite(TcpDiscoverySpiFailureTimeoutSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class));
         suite.addTest(new 
TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class));
@@ -55,7 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends 
TestSuite {
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new 
TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
-        suite.addTest(new 
TestSuite(TcpClientDiscoverySpiFailureThresholdSelfTest.class));
+        suite.addTest(new 
TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
 
         suite.addTest(new 
TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
         suite.addTest(new 
TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));

Reply via email to