http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java new file mode 100644 index 0000000..fbea187 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; + +import java.io.*; +import java.net.*; + +/** + * + */ +public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelfTest { + /** */ + private static final int SPI_COUNT = 6; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int getSpiCount() { + return SPI_COUNT; + } + + /** {@inheritDoc} */ + @Override protected DiscoverySpi getSpi(int idx) { + TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi(); + + spi.setMetricsProvider(createMetricsProvider()); + spi.setIpFinder(ipFinder); + + switch (idx) { + case 0: + case 1: + // Ignore + break; + case 2: + spi.setAckTimeout(3000); + break; + case 3: + spi.setSocketTimeout(4000); + break; + case 4: + spi.setReconnectCount(4); + break; + case 5: + spi.setMaxAckTimeout(10000); + break; + default: + assert false; + } + + return spi; + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionTimeoutEnabled() throws Exception { + assertTrue(firstSpi().failureDetectionTimeoutEnabled()); + assertTrue(secondSpi().failureDetectionTimeoutEnabled()); + + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(), + firstSpi().failureDetectionTimeout()); + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(), + secondSpi().failureDetectionTimeout()); + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionTimeoutDisabled() throws Exception { + for (int i = 2; i < spis.size(); i++) { + assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled()); + assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout()); + } + } + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionOnSocketOpen() throws Exception { + try { + ClusterNode node = secondSpi().getLocalNode(); + + firstSpi().openSocketTimeout = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout")); + + firstSpi().openSocketTimeout = false; + firstSpi().openSocketTimeoutWait = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait")); + } + finally { + firstSpi().resetState(); + } + } + + + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionOnSocketWrite() throws Exception { + try { + ClusterNode node = secondSpi().getLocalNode(); + + firstSpi().writeToSocketTimeoutWait = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + + firstSpi().writeToSocketTimeoutWait = false; + + assertTrue(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + } + finally { + firstSpi().resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessage() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.size(); i++) + if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { + nextSpi = (TestTcpDiscoverySpi)spis.get(i); + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; + + Thread.sleep(firstSpi().failureDetectionTimeout()); + + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent >= 3 && sent < 7 : "messages sent: " + sent; + assert received >= 3 && received < 7 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessageBackwardCompatibility() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + TcpDiscoveryNode nextNode = null; + + IgniteProductVersion nextNodeVer = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.size(); i++) + if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { + nextSpi = (TestTcpDiscoverySpi)spis.get(i); + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + nextNodeVer = nextNode.version(); + + // Overriding the version of the next node. Connection check message must not been sent to it. + nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER, + 0l, null)); + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; + + Thread.sleep(firstSpi().failureDetectionTimeout() / 2); + + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent == 0 : "messages sent: " + sent; + assert received == 0 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + + if (nextNode != null && nextNodeVer != null) + nextNode.version(nextNodeVer); + } + } + + /** + * Returns the first spi with failure detection timeout enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi firstSpi() { + return (TestTcpDiscoverySpi)spis.get(0); + } + + + /** + * Returns the second spi with failure detection timeout enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi secondSpi() { + return (TestTcpDiscoverySpi)spis.get(1); + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private volatile boolean openSocketTimeout; + + /** */ + private volatile boolean openSocketTimeoutWait; + + /** */ + private volatile boolean writeToSocketTimeoutWait; + + /** */ + private volatile boolean countConnCheckMsg; + + /** */ + private volatile int connCheckStatusMsgCntSent; + + /** */ + private volatile int connCheckStatusMsgCntReceived; + + /** */ + private volatile boolean validTimeout = true; + + /** */ + private volatile IgniteSpiOperationTimeoutException err; + + + /** {@inheritDoc} */ + @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { + + if (openSocketTimeout) { + err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout"); + throw err; + } + else if (openSocketTimeoutWait) { + long timeout = timeoutHelper.nextTimeoutChunk(0); + + try { + Thread.sleep(timeout + 1000); + } + catch (InterruptedException e) { + // Ignore + } + + try { + timeoutHelper.nextTimeoutChunk(0); + } + catch (IgniteSpiOperationTimeoutException e) { + throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait")); + } + } + + Socket sock = super.openSocket(sockAddr, timeoutHelper); + + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + // Ignore + } + + return sock; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + throws IOException, IgniteCheckedException { + if (!(msg instanceof TcpDiscoveryPingRequest)) { + super.writeToSocket(sock, msg, timeout); + return; + } + + if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) { + validTimeout = false; + + throw new IgniteCheckedException("Invalid timeout: " + timeout); + } + + if (writeToSocketTimeoutWait) { + try { + Thread.sleep(timeout); + } + catch (InterruptedException e) { + // Ignore + } + } + else + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, msg, bout, timeout); + } + + /** {@inheritDoc} */ + protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntReceived++; + + super.writeToSocket(msg, sock, res, timeout); + } + + /** + * + */ + private void resetState() { + openSocketTimeout = false; + openSocketTimeoutWait = false; + writeToSocketTimeoutWait = false; + err = null; + validTimeout = true; + connCheckStatusMsgCntSent = 0; + connCheckStatusMsgCntReceived = 0; + countConnCheckMsg = false; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ab0dd3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 2e35c73..d77c432 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -44,7 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoverySelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class)); - suite.addTest(new TestSuite(TcpDiscoverySpiFailureThresholdSelfTest.class)); + suite.addTest(new TestSuite(TcpDiscoverySpiFailureTimeoutSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class)); @@ -55,7 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); - suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureThresholdSelfTest.class)); + suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));