ignite-752: added tests for client spi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/af624eb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/af624eb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/af624eb4 Branch: refs/heads/ignite-752 Commit: af624eb4fb1db6563ea583b298b7471de2506ab2 Parents: 0cc31b2 Author: Denis Magda <dma...@gridgain.com> Authored: Sun Jul 19 12:37:14 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Sun Jul 19 12:37:14 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 7 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +- ...entDiscoverySpiFailureThresholdSelfTest.java | 83 ++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 77 ++++-- .../tcp/TcpDiscoverySpiConfigSelfTest.java | 4 + ...TcpDiscoverySpiFailureThresholdSelfTest.java | 270 ++++++++++++++++--- .../IgniteSpiDiscoverySelfTestSuite.java | 1 + 7 files changed, 385 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f05d027..6e0d199 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -622,8 +622,11 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void onDataReceived() { if (spi.failureDetectionThresholdEnabled()) { - locNode.lastDataReceivedTime(U.currentTimeMillis()); - chkStatusSnd.onDataReceived(); + if (locNode != null) + locNode.lastDataReceivedTime(U.currentTimeMillis()); + + if (chkStatusSnd != null) + chkStatusSnd.onDataReceived(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index f231c29..23166f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -158,13 +158,13 @@ import java.util.concurrent.atomic.*; @DiscoverySpiHistorySupport(true) public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { /** Failure detection threshold feature major version. */ - final static int FAILURE_DETECTION_MAJOR_VER = 1; + final static byte FAILURE_DETECTION_MAJOR_VER = 1; /** Failure detection threshold feature minor version. */ - final static int FAILURE_DETECTION_MINOR_VER = 3; + final static byte FAILURE_DETECTION_MINOR_VER = 4; /** Failure detection threshold feature maintainance version. */ - final static int FAILURE_DETECTION_MAINT_VER = 1; + final static byte FAILURE_DETECTION_MAINT_VER = 1; /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java new file mode 100644 index 0000000..202b328 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.internal.util.typedef.*; + +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Client-based discovery SPI test with failure detection threshold enabled. + */ +public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDiscoverySpiSelfTest { + /** */ + private final static int FAILURE_AWAIT_TIME = 7_000; + + /** {@inheritDoc} */ + @Override protected boolean useFailureDetectionThreshold() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long failureDetectionThreshold() { + return 10_000; + } + + /** {@inheritDoc} */ + protected void await(CountDownLatch latch) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(failureDetectionThreshold() + + FAILURE_AWAIT_TIME, MILLISECONDS)); + } + + /** + * @throws Exception in case of error. + */ + public void testFailureDetectionThresholdEnabled() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())). + failureDetectionThresholdEnabled()); + assertEquals(failureDetectionThreshold(), + ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionThreshold()); + + assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())). + failureDetectionThresholdEnabled()); + assertEquals(failureDetectionThreshold(), + ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold()); + } + + /** {@inheritDoc} */ + public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception { + reconnectSegmentedAfterJoinTimeout(true, failureDetectionThreshold() + FAILURE_AWAIT_TIME); + } + + /** {@inheritDoc} */ + public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception { + reconnectSegmentedAfterJoinTimeout(false, failureDetectionThreshold() + FAILURE_AWAIT_TIME); + } + + /** {@inheritDoc} */ + public void testDisconnectAfterNetworkTimeout() throws Exception { + testDisconnectAfterNetworkTimeout(failureDetectionThreshold() + FAILURE_AWAIT_TIME); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index be442b5..458e545 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -114,6 +114,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private boolean reconnectDisabled; + /** */ + private boolean useFailureDetectionThreshold; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -154,13 +157,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { else throw new IllegalArgumentException(); - if (longSockTimeouts) { + if (longSockTimeouts && !useFailureDetectionThreshold()) { disco.setAckTimeout(2000); disco.setSocketTimeout(2000); } disco.setJoinTimeout(joinTimeout); - disco.setNetworkTimeout(netTimeout); + + if (!useFailureDetectionThreshold()) + disco.setNetworkTimeout(netTimeout); + else + cfg.setFailureDetectionThreshold(failureDetectionThreshold()); disco.setClientReconnectDisabled(reconnectDisabled); @@ -205,6 +212,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * Checks whether to use failure detection threshold instead of setting explicit timeouts. + * + * @return {@code true} if use. + */ + protected boolean useFailureDetectionThreshold() { + return false; + } + + /** + * Gets failure detection threshold to use. + * + * @return Failure detection threshold. + */ + protected long failureDetectionThreshold() { + return 0; + } + + /** * @throws Exception If failed. */ public void testJoinTimeout() throws Exception { @@ -418,7 +443,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { Ignite srv1 = G.ignite("server-1"); Ignite client = G.ignite("client-0"); - ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + if (!useFailureDetectionThreshold()) + ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite(); @@ -756,8 +782,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } - catch (InterruptedException ignored) { + } catch (InterruptedException ignored) { Thread.interrupted(); } } @@ -896,7 +921,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { startClientNodes(1); assertEquals(G.ignite("server-0").cluster().localNode().id(), - ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); + ((TcpDiscoveryNode)G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); checkNodes(2, 1); @@ -1460,21 +1485,21 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception { - reconnectSegmentedAfterJoinTimeout(true); + reconnectSegmentedAfterJoinTimeout(true, 10_000); } /** * @throws Exception If failed. */ public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception { - reconnectSegmentedAfterJoinTimeout(false); + reconnectSegmentedAfterJoinTimeout(false, 10_000); } /** * @param failSrv If {@code true} fails server, otherwise server does not send join message. * @throws Exception If failed. */ - private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception { + protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv, long awaitTimeout) throws Exception { netTimeout = 4000; joinTimeout = 5000; @@ -1542,9 +1567,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); } - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(awaitTimeout, MILLISECONDS)); - assertTrue(segmentedLatch.await(10_000, MILLISECONDS)); + assertTrue(segmentedLatch.await(awaitTimeout, MILLISECONDS)); waitSegmented(client); @@ -1557,7 +1582,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public boolean apply() { return srv.cluster().nodes().size() == 1; } - }, 10_000); + }, awaitTimeout); checkNodes(1, 0); } @@ -1590,8 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertEquals(1, disconnectLatch.getCount()); disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { log.info("Reconnected event."); assertEquals(1, reconnectLatch.getCount()); @@ -1599,8 +1623,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertFalse(err.get()); reconnectLatch.countDown(); - } - else { + } else { log.error("Unexpected event: " + evt); err.set(true); @@ -1639,6 +1662,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDisconnectAfterNetworkTimeout() throws Exception { + testDisconnectAfterNetworkTimeout(10_000); + } + + /** + * @param timeout Timeout to wait. + * @throws Exception if failed. + */ + public void testDisconnectAfterNetworkTimeout(long timeout) throws Exception { netTimeout = 5000; joinTimeout = 60_000; maxMissedClientHbs = 2; @@ -1695,7 +1726,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(timeout, MILLISECONDS)); log.info("Fail client connection2."); @@ -1704,7 +1735,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientSpi.brakeConnection(); - assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(reconnectLatch.await(timeout, MILLISECONDS)); clientNodeIds.clear(); @@ -1715,7 +1746,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public boolean apply() { return srv.cluster().nodes().size() == 2; } - }, 10_000); + }, timeout); checkNodes(1, 1); @@ -1759,7 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param cnt Number of nodes. * @throws Exception In case of error. */ - private void startServerNodes(int cnt) throws Exception { + protected void startServerNodes(int cnt) throws Exception { for (int i = 0; i < cnt; i++) { Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); @@ -1771,7 +1802,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param cnt Number of nodes. * @throws Exception In case of error. */ - private void startClientNodes(int cnt) throws Exception { + protected void startClientNodes(int cnt) throws Exception { for (int i = 0; i < cnt; i++) { Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); @@ -1888,7 +1919,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param srvCnt Number of server nodes. * @param clientCnt Number of client nodes. */ - private void checkNodes(int srvCnt, int clientCnt) { + protected void checkNodes(int srvCnt, int clientCnt) { long topVer = -1; for (int i = 0; i < srvCnt; i++) { @@ -1950,7 +1981,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @param latch Latch. * @throws InterruptedException If interrupted. */ - private void await(CountDownLatch latch) throws InterruptedException { + protected void await(CountDownLatch latch) throws InterruptedException { assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java index 3e895be..91f4f9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp; +import org.apache.ignite.configuration.*; import org.apache.ignite.testframework.junits.spi.*; /** @@ -41,5 +42,8 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1); checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0); checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0); + checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", + IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java index db0d9c5..fab3628 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java @@ -20,11 +20,15 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; import java.io.*; import java.net.*; @@ -34,26 +38,56 @@ import java.net.*; */ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest { /** */ - private static TestTcpDiscoverySpi firstSpi; + private static final int SPI_COUNT = 7; /** */ - private static TestTcpDiscoverySpi secondSpi; + private static final long CONN_CHECK_FREQ = 2000; + + /** */ + private static TestTcpDiscoverySpi spis[] = new TestTcpDiscoverySpi[SPI_COUNT]; /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ + @Override protected int getSpiCount() { + return SPI_COUNT; + } + + /** {@inheritDoc} */ @Override protected DiscoverySpi getSpi(int idx) { TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi(); - if (idx == 0) - firstSpi = spi; - else - secondSpi = spi; - spi.setMetricsProvider(createMetricsProvider()); spi.setIpFinder(ipFinder); + spis[idx] = spi; + + switch (idx) { + case 0: + spi.setConnectionCheckFrequency(CONN_CHECK_FREQ); + break; + case 1: + // Ignore + break; + case 2: + spi.setAckTimeout(3000); + break; + case 3: + spi.setSocketTimeout(4000); + break; + case 4: + spi.setReconnectCount(4); + break; + case 5: + spi.setMaxAckTimeout(10000); + break; + case 6: + spi.setNetworkTimeout(4000); + break; + default: + assert false; + } return spi; } @@ -61,11 +95,21 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe * @throws Exception In case of error. */ public void testFailureDetectionThresholdEnabled() throws Exception { - assertTrue(firstSpi.failureDetectionThresholdEnabled()); - assertTrue(secondSpi.failureDetectionThresholdEnabled()); + assertTrue(firstSpi().failureDetectionThresholdEnabled()); + assertTrue(secondSpi().failureDetectionThresholdEnabled()); + + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi().failureDetectionThreshold()); + assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi().failureDetectionThreshold()); + } - assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi.failureDetectionThreshold()); - assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi.failureDetectionThreshold()); + /** + * @throws Exception In case of error. + */ + public void testFailureDetectionThresholdDisabled() throws Exception { + for (int i = 2; i < spis.length; i++) { + assertFalse(spis[i].failureDetectionThresholdEnabled()); + assertEquals(0, spis[i].failureDetectionThreshold()); + } } /** @@ -73,23 +117,23 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe */ public void testFailureDetectionOnSocketOpen() throws Exception { try { - ClusterNode node = secondSpi.getLocalNode(); + ClusterNode node = secondSpi().getLocalNode(); - firstSpi.openSocketTimeout = true; + firstSpi().openSocketTimeout = true; - assertFalse(firstSpi.pingNode(node.id())); - assertTrue(firstSpi.validTimeout); - assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeout")); + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout")); - firstSpi.openSocketTimeout = false; - firstSpi.openSocketTimeoutWait = true; + firstSpi().openSocketTimeout = false; + firstSpi().openSocketTimeoutWait = true; - assertFalse(firstSpi.pingNode(node.id())); - assertTrue(firstSpi.validTimeout); - assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeoutWait")); + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait")); } finally { - firstSpi.resetState(); + firstSpi().resetState(); } } @@ -99,41 +143,176 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe */ public void testFailureDetectionOnSocketWrite() throws Exception { try { - ClusterNode node = secondSpi.getLocalNode(); + ClusterNode node = secondSpi().getLocalNode(); + + firstSpi().writeToSocketTimeoutWait = true; + + assertFalse(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + + firstSpi().writeToSocketTimeoutWait = false; + + assertTrue(firstSpi().pingNode(node.id())); + assertTrue(firstSpi().validTimeout); + } + finally { + firstSpi().resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessage() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.length; i++) + if (spis[i].getLocalNode().id().equals(nextNode.id())) { + nextSpi = spis[i]; + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; + + Thread.sleep(CONN_CHECK_FREQ * 5); + + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent >= 3 && sent < 7 : "messages sent: " + sent; + assert received >= 3 && received < 7 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testConnectionCheckMessageBackwardCompatibility() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + TcpDiscoveryNode nextNode = null; + + IgniteProductVersion nextNodeVer = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); - firstSpi.writeToSocketTimeoutWait = true; + nextSpi = null; - assertFalse(firstSpi.pingNode(node.id())); - assertTrue(firstSpi.validTimeout); + for (int i = 1; i < spis.length; i++) + if (spis[i].getLocalNode().id().equals(nextNode.id())) { + nextSpi = spis[i]; + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + nextNodeVer = nextNode.version(); + + // Overriding the version of the next node. Connection check message must not been sent to it. + nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, + (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER, + 0l, null)); + + firstSpi().countConnCheckMsg = true; + nextSpi.countConnCheckMsg = true; - firstSpi.writeToSocketTimeoutWait = false; + Thread.sleep(CONN_CHECK_FREQ * 5); - assertTrue(firstSpi.pingNode(node.id())); - assertTrue(firstSpi.validTimeout); + firstSpi().countConnCheckMsg = false; + nextSpi.countConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent == 0 : "messages sent: " + sent; + assert received == 0 : "messages received: " + received; } finally { - firstSpi.resetState(); + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + + if (nextNode != null && nextNodeVer != null) + nextNode.version(nextNodeVer); } } /** + * Returns the first spi with failure detection threshold enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi firstSpi() { + return spis[0]; + } + + + /** + * Returns the second spi with failure detection threshold enabled. + * + * @return SPI. + */ + private TestTcpDiscoverySpi secondSpi() { + return spis[1]; + } + + /** * */ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ - private boolean openSocketTimeout; + private volatile boolean openSocketTimeout; + + /** */ + private volatile boolean openSocketTimeoutWait; + + /** */ + private volatile boolean writeToSocketTimeoutWait; + + /** */ + private volatile boolean countConnCheckMsg; /** */ - private boolean openSocketTimeoutWait; + private volatile int connCheckStatusMsgCntSent; /** */ - private boolean writeToSocketTimeoutWait; + private volatile int connCheckStatusMsgCntReceived; /** */ - private boolean validTimeout = true; + private volatile boolean validTimeout = true; /** */ - private IgniteSpiOperationTimeoutException err; + private volatile IgniteSpiOperationTimeoutException err; /** {@inheritDoc} */ @@ -200,6 +379,24 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe super.writeToSocket(sock, msg, timeout); } + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, msg, bout, timeout); + } + + /** {@inheritDoc} */ + protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) + throws IOException { + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntReceived++; + + super.writeToSocket(msg, sock, res, timeout); + } + /** * */ @@ -209,6 +406,9 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe writeToSocketTimeoutWait = false; err = null; validTimeout = true; + connCheckStatusMsgCntSent = 0; + connCheckStatusMsgCntReceived = 0; + countConnCheckMsg = false; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 357fd93..a78ab25 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -55,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); + suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureThresholdSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));