ignite-752: added new tests, fixed bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2a2dcf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2a2dcf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2a2dcf6 Branch: refs/heads/ignite-752 Commit: f2a2dcf6baac19cef0cbb61592decd2135c6646e Parents: 6d5e7d3 Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 22 10:48:56 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 22 10:48:56 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- ...entDiscoverySpiFailureThresholdSelfTest.java | 120 ++++++++++++++++++- .../tcp/TcpClientDiscoverySpiSelfTest.java | 22 ++-- 4 files changed, 152 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 877d53c..56472aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -501,6 +501,8 @@ class ServerImpl extends TcpDiscoveryImpl { int reconCnt = 0; + boolean openedSock = false; + while (true) { try { if (addr.isUnresolved()) @@ -510,6 +512,8 @@ class ServerImpl extends TcpDiscoveryImpl { sock = spi.openSocket(addr, timeoutHelper); + openedSock = true; + spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); @@ -537,9 +541,14 @@ class ServerImpl extends TcpDiscoveryImpl { errs.add(e); + reconCnt++; + + if (!openedSock && reconCnt == 2) + break; + if (timeoutHelper.checkThresholdReached(e)) break; - else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount()) + else if (!spi.failureDetectionThresholdEnabled() && reconCnt == spi.getReconnectCount()) break; } finally { @@ -1846,6 +1855,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void noMessageLoop() { + if (locNode == null) + return; + checkConnection(); sendHeartbeatMessage(); @@ -1919,11 +1931,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog("No next node in topology."); - /*if (ring.hasRemoteNodes()) { + if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && + !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) { msg.senderNodeId(locNodeId); addMessage(msg); - }*/ + } break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 588ff98..3821a0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1369,7 +1369,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IOException If IO failed or read timed out. * @throws IgniteCheckedException If unmarshalling failed. */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { + protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, + IgniteCheckedException { assert sock != null; int oldTimeout = sock.getSoTimeout(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java index 8e80047..939286d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java @@ -17,7 +17,14 @@ package org.apache.ignite.spi.discovery.tcp; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; /** * Client-based discovery SPI test with failure detection threshold enabled. @@ -26,6 +33,9 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc /** */ private final static int FAILURE_AWAIT_TIME = 7_000; + /** */ + private static boolean useTestSpi; + /** {@inheritDoc} */ @Override protected boolean useFailureDetectionThreshold() { return true; @@ -33,7 +43,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc /** {@inheritDoc} */ @Override protected long failureDetectionThreshold() { - return 10_000; + return useTestSpi ? 5000 : 10_000; } /** {@inheritDoc} */ @@ -41,6 +51,18 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc return failureDetectionThreshold() + FAILURE_AWAIT_TIME; } + /** {@inheritDoc} */ + @Override protected TcpDiscoverySpi getDiscoverySpi() { + return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + return cfg; + } + /** * @throws Exception in case of error. */ @@ -60,4 +82,100 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc assertEquals(failureDetectionThreshold(), ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold()); } + + /** + * @throws Exception in case of error. + */ + public void testFailureThresholdWorkability() throws Exception { + useTestSpi = true; + + TestTcpDiscoverySpi firstSpi = null; + TestTcpDiscoverySpi secondSpi = null; + + try { + startServerNodes(2); + + checkNodes(2, 0); + + firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi()); + secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi()); + + assert firstSpi.err == null; + + secondSpi.readDelay = failureDetectionThreshold() + 5000; + + assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + Thread.sleep(failureDetectionThreshold()); + + assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class)); + + firstSpi.reset(); + secondSpi.reset(); + + assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId())); + + assertTrue(firstSpi.err == null); + } + finally { + useTestSpi = false; + + if (firstSpi != null) + firstSpi.reset(); + + if (secondSpi != null) + secondSpi.reset(); + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private long readDelay; + + /** */ + private Exception err; + + /** {@inheritDoc} */ + @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) + throws IOException, IgniteCheckedException { + + if (readDelay < failureDetectionThreshold()) { + try { + T msg = super.readMessage(sock, in, timeout); + + return msg; + } + catch (Exception e) { + err = e; + + throw e; + } + } + else { + T msg = super.readMessage(sock, in, timeout); + + if (msg instanceof TcpDiscoveryPingRequest) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // Ignore + } + throw new SocketTimeoutException("Forced timeout"); + } + + return msg; + } + } + + /** + * Resets testing state. + */ + private void reset() { + readDelay = 0; + err = null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 9df7bd9..a67b5cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + TcpDiscoverySpi disco = getDiscoverySpi(); disco.setMaxMissedClientHeartbeats(maxMissedClientHbs); @@ -167,7 +167,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setClientReconnectDisabled(reconnectDisabled); - disco.afterWrite(afterWrite); + if (disco instanceof TestTcpDiscoverySpi) + ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite); cfg.setDiscoverySpi(disco); @@ -177,6 +178,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { return cfg; } + /** + * Returns TCP Discovery SPI instance to use in a test. + * @return TCP Discovery SPI. + */ + protected TcpDiscoverySpi getDiscoverySpi() { + return new TestTcpDiscoverySpi(); + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses(); @@ -411,12 +420,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); - ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { + ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure + <Socket>() { @Override public void apply(Socket sock) { try { latch.await(); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } } @@ -778,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } - catch (InterruptedException ignored) { + } catch (InterruptedException ignored) { Thread.interrupted(); } }