Squashed commit of the IGNITE-1229
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7635e589 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7635e589 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7635e589 Branch: refs/heads/ignite-gg-9615-1 Commit: 7635e5894df6aab477b82253451b729985f632be Parents: 1f00c70 Author: Denis Magda <dma...@gridgain.com> Authored: Mon Aug 17 16:41:03 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Aug 17 16:41:03 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 45 ++++-- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 152 ++++++++++++++++++- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 8 +- 4 files changed, 245 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 76144e3..40e110f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl { protected TcpDiscoverySpiState spiState = DISCONNECTED; /** Map with proceeding ping requests. */ - private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = + private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap = new ConcurrentHashMap8<>(); /** @@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl { return F.t(getLocalNodeId(), clientPingRes); } - GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); + GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridPingFutureAdapter<>(); - IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); + GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); if (oldFut != null) return oldFut.get(); @@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutHelper); + sock = spi.createSocket(); + + fut.sock = sock; + + sock = spi.openSocket(sock, addr, timeoutHelper); openedSock = true; @@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** + * Interrupts all existed 'ping' request for the given node. + * + * @param node Node that may be pinged. + */ + private void interruptPing(TcpDiscoveryNode node) { + for (InetSocketAddress addr : spi.getNodeAddresses(node)) { + GridPingFutureAdapter fut = pingMap.get(addr); + + if (fut != null && fut.sock != null) + // Reference to the socket is not set to null. No need to assign it to a local variable. + U.closeQuiet(fut.sock); + } + } + /** {@inheritDoc} */ @Override public void disconnect() throws IgniteSpiException { spiStop0(true); @@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified() && !locNodeId.equals(leavingNodeId)) { TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId); + interruptPing(leavingNode); + assert leftNode != null; if (log.isDebugEnabled()) @@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { node = ring.removeNode(nodeId); + interruptPing(node); + assert node != null; long topVer; @@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, bout, timeout); } } + + /** + * + */ + private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> { + /** Socket. */ + private volatile Socket sock; + + /** + * Returns socket associated with this ping future. + * + * @return Socket or {@code null} if no socket associated. + */ + public Socket sock() { + return sock; + } + + /** + * Associates socket with this ping future. + * + * @param sock Socket. + */ + public void sock(Socket sock) { + this.sock = sock; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 18a540c..2f3d410 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param timeoutHelper Timeout helper. * @return Opened socket. * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. */ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { - assert sockAddr != null; + return openSocket(createSocket(), sockAddr, timeoutHelper); + } - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; + /** + * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established. + * + * @param sock Socket bound to a local host address. + * @param remAddr Remote address. + * @param timeoutHelper Timeout helper. + * @return Connected socket. + * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. + */ + protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { + + assert remAddr != null; + + InetSocketAddress resolved = remAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr; InetAddress addr = resolved.getAddress(); assert addr != null; + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); + + writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + + return sock; + } + + /** + * Creates socket binding it to a local host address. This operation is not blocking. + * + * @return Created socket. + * @throws IOException If failed. + */ + Socket createSocket() throws IOException { Socket sock; if (isSslEnabled()) @@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - - writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); - return sock; } @@ -1250,8 +1277,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 9a44c24..2b404c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -70,7 +70,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = new TcpDiscoverySpi(); + TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? + new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); discoMap.put(gridName, spi); @@ -128,6 +129,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (U.isMacOs()) spi.setLocalAddress(F.first(U.allLocalIps())); } + else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode")) + cfg.setFailureDetectionTimeout(30_000); return cfg; } @@ -339,6 +342,153 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** * @throws Exception If any error occurs. */ + public void testPingInterruptedOnNodeFailed() throws Exception { + try { + final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); + final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); + startGrid("testPingInterruptedOnNodeFailedSimpleNode"); + + ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + + final CountDownLatch pingLatch = new CountDownLatch(1); + + final CountDownLatch eventLatch = new CountDownLatch(1); + + final AtomicBoolean pingRes = new AtomicBoolean(true); + + final AtomicBoolean failRes = new AtomicBoolean(false); + + long startTs = System.currentTimeMillis(); + + pingingNode.events().localListen( + new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) { + failRes.set(true); + eventLatch.countDown(); + } + + return true; + } + }, + EventType.EVT_NODE_FAILED); + + IgniteInternalFuture<?> pingFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + pingLatch.countDown(); + + pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( + failedNode.cluster().localNode().id())); + + return null; + } + }, 1); + + IgniteInternalFuture<?> failingFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + pingLatch.await(); + + Thread.sleep(3000); + + ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure(); + + return null; + } + }, 1); + + failingFut.get(); + pingFut.get(); + + assertFalse(pingRes.get()); + + assertTrue(System.currentTimeMillis() - startTs < + pingingNode.configuration().getFailureDetectionTimeout() / 2); + + assertTrue(eventLatch.await(7, TimeUnit.SECONDS)); + assertTrue(failRes.get()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testPingInterruptedOnNodeLeft() throws Exception { + try { + final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); + final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); + startGrid("testPingInterruptedOnNodeFailedSimpleNode"); + + ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + + final CountDownLatch pingLatch = new CountDownLatch(1); + + final AtomicBoolean pingRes = new AtomicBoolean(true); + + long startTs = System.currentTimeMillis(); + + IgniteInternalFuture<?> pingFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + pingLatch.countDown(); + + pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( + leftNode.cluster().localNode().id())); + + return null; + } + }, 1); + + IgniteInternalFuture<?> stoppingFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + pingLatch.await(); + + Thread.sleep(3000); + + stopGrid("testPingInterruptedOnNodeFailedFailingNode"); + + return null; + } + }, 1); + + stoppingFut.get(); + pingFut.get(); + + assertFalse(pingRes.get()); + + assertTrue(System.currentTimeMillis() - startTs < + pingingNode.configuration().getFailureDetectionTimeout() / 2); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private boolean ignorePingResponse; + + /** {@inheritDoc} */ + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { + if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) + return; + else + super.writeToSocket(sock, msg, timeout); + } + } + + /** + * @throws Exception If any error occurs. + */ public void testNodeAdded() throws Exception { try { final Ignite g1 = startGrid(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index fbea187..df36644 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -305,7 +305,8 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf /** {@inheritDoc} */ - @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + @Override protected Socket openSocket(Socket sock, InetSocketAddress sockAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { if (openSocketTimeout) { @@ -330,11 +331,12 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf } } - Socket sock = super.openSocket(sockAddr, timeoutHelper); + super.openSocket(sock, sockAddr, timeoutHelper); try { Thread.sleep(1500); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { // Ignore }