Repository: incubator-ignite Updated Branches: refs/heads/ignite-1229 2159d79e2 -> 23dc6558e
ignite-1229: interrupt ping queries if remote node leaves or fails Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4381bf7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4381bf7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4381bf7f Branch: refs/heads/ignite-1229 Commit: 4381bf7fbbcf6faa906cc9ed465d3feb2a302224 Parents: 2159d79 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Aug 13 15:54:58 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Aug 13 15:54:58 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++++++++++++++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 41 +++++++++++--- 2 files changed, 87 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 76144e3..1f0266e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl { protected TcpDiscoverySpiState spiState = DISCONNECTED; /** Map with proceeding ping requests. */ - private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap = + private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap = new ConcurrentHashMap8<>(); /** @@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl { return F.t(getLocalNodeId(), clientPingRes); } - GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); + GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridPingFutureAdapter<>(); - IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); + GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut); if (oldFut != null) return oldFut.get(); @@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); - sock = spi.openSocket(addr, timeoutHelper); + sock = spi.createSocket(); + + fut.sock = sock; + + sock = spi.openSocket(sock, addr, timeoutHelper); openedSock = true; @@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** + * Interrupts all existed 'ping' request for the given node. + * + * @param node Node that may be pinged. + */ + private void interruptPing(TcpDiscoveryNode node) { + for (InetSocketAddress addr : spi.getNodeAddresses(node)) { + GridPingFutureAdapter fut = pingMap.get(addr); + + if (fut != null && fut.sock != null) + // Reference to the socket is not set to null. No need to assign it to a local variable. + U.closeQuiet(fut.sock); + } + } + /** {@inheritDoc} */ @Override public void disconnect() throws IgniteSpiException { spiStop0(true); @@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified() && !locNodeId.equals(leavingNodeId)) { TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId); + interruptPing(leavingNode); + assert leftNode != null; if (log.isDebugEnabled()) @@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { node = ring.removeNode(nodeId); + interruptPing(node); + assert node != null; long topVer; @@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, bout, timeout); } } + + /** + * + */ + private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> { + /** Socket. */ + private Socket sock; + + /** + * Returns socket associated with this ping future. + * + * @return Socket or {@code null} if no socket associated. + */ + public Socket sock() { + return sock; + } + + /** + * Associates socket with this ping futer. + * + * @param sock Socket. + */ + public void sock(Socket sock) { + this.sock = sock; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 18a540c..65ab8fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param timeoutHelper Timeout helper. * @return Opened socket. * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. */ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { - assert sockAddr != null; + return openSocket(createSocket(), sockAddr, timeoutHelper); + } + + /** + * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established. + * + * @param sock Socket bound to a local host address. + * @param remAddr Remote address. + * @param timeoutHelper Timeout helper. + * @return Connected socket. + * @throws IOException If failed. + * @throws IgniteSpiOperationTimeoutException In case of timeout. + */ + Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper) + throws IOException, IgniteSpiOperationTimeoutException { + + assert remAddr != null; - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; + InetSocketAddress resolved = remAddr.isUnresolved() ? + new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr; InetAddress addr = resolved.getAddress(); assert addr != null; + sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); + + writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + + return sock; + } + + /** + * Creates socket binding it to a local host address. This operation is not blocking. + * + * @return Created socket. + * @throws IOException If failed. + */ + Socket createSocket() throws IOException { Socket sock; if (isSslEnabled()) @@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.setTcpNoDelay(true); - sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - - writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); - return sock; }