Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1229 2159d79e2 -> 23dc6558e


ignite-1229: interrupt ping queries if remote node leaves or fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4381bf7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4381bf7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4381bf7f

Branch: refs/heads/ignite-1229
Commit: 4381bf7fbbcf6faa906cc9ed465d3feb2a302224
Parents: 2159d79
Author: Denis Magda <dma...@gridgain.com>
Authored: Thu Aug 13 15:54:58 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Thu Aug 13 15:54:58 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 57 ++++++++++++++++++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 41 +++++++++++---
 2 files changed, 87 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 76144e3..1f0266e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     protected TcpDiscoverySpiState spiState = DISCONNECTED;
 
     /** Map with proceeding ping requests. */
-    private final ConcurrentMap<InetSocketAddress, 
IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+    private final ConcurrentMap<InetSocketAddress, 
GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
         new ConcurrentHashMap8<>();
 
     /**
@@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             return F.t(getLocalNodeId(), clientPingRes);
         }
 
-        GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new 
GridFutureAdapter<>();
+        GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new 
GridPingFutureAdapter<>();
 
-        IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = 
pingMap.putIfAbsent(addr, fut);
+        GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> oldFut = 
pingMap.putIfAbsent(addr, fut);
 
         if (oldFut != null)
             return oldFut.get();
@@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         long tstamp = U.currentTimeMillis();
 
-                        sock = spi.openSocket(addr, timeoutHelper);
+                        sock = spi.createSocket();
+
+                        fut.sock = sock;
+
+                        sock = spi.openSocket(sock, addr, timeoutHelper);
 
                         openedSock = true;
 
@@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /**
+     * Interrupts all existed 'ping' request for the given node.
+     *
+     * @param node Node that may be pinged.
+     */
+    private void interruptPing(TcpDiscoveryNode node) {
+        for (InetSocketAddress addr : spi.getNodeAddresses(node)) {
+            GridPingFutureAdapter fut = pingMap.get(addr);
+
+            if (fut != null && fut.sock != null)
+                // Reference to the socket is not set to null. No need to 
assign it to a local variable.
+                U.closeQuiet(fut.sock);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void disconnect() throws IgniteSpiException {
         spiStop0(true);
@@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg.verified() && !locNodeId.equals(leavingNodeId)) {
                 TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId);
 
+                interruptPing(leavingNode);
+
                 assert leftNode != null;
 
                 if (log.isDebugEnabled())
@@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg.verified()) {
                 node = ring.removeNode(nodeId);
 
+                interruptPing(node);
+
                 assert node != null;
 
                 long topVer;
@@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl {
             spi.writeToSocket(sock, msg, bout, timeout);
         }
     }
+
+    /**
+     *
+     */
+    private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> 
{
+        /** Socket. */
+        private Socket sock;
+
+        /**
+         * Returns socket associated with this ping future.
+         *
+         * @return Socket or {@code null} if no socket associated.
+         */
+        public Socket sock() {
+            return sock;
+        }
+
+        /**
+         * Associates socket with this ping futer.
+         *
+         * @param sock Socket.
+         */
+        public void sock(Socket sock) {
+            this.sock = sock;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18a540c..65ab8fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
      * @param timeoutHelper Timeout helper.
      * @return Opened socket.
      * @throws IOException If failed.
+     * @throws IgniteSpiOperationTimeoutException In case of timeout.
      */
     protected Socket openSocket(InetSocketAddress sockAddr, 
IgniteSpiOperationTimeoutHelper timeoutHelper)
         throws IOException, IgniteSpiOperationTimeoutException {
-        assert sockAddr != null;
+        return openSocket(createSocket(), sockAddr, timeoutHelper);
+    }
+
+    /**
+     * Connects to remote address sending {@code U.IGNITE_HEADER} when 
connection is established.
+     *
+     * @param sock Socket bound to a local host address.
+     * @param remAddr Remote address.
+     * @param timeoutHelper Timeout helper.
+     * @return Connected socket.
+     * @throws IOException If failed.
+     * @throws IgniteSpiOperationTimeoutException In case of timeout.
+     */
+    Socket openSocket(Socket sock, InetSocketAddress remAddr, 
IgniteSpiOperationTimeoutHelper timeoutHelper)
+        throws IOException, IgniteSpiOperationTimeoutException {
+
+        assert remAddr != null;
 
-        InetSocketAddress resolved = sockAddr.isUnresolved() ?
-            new 
InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), 
sockAddr.getPort()) : sockAddr;
+        InetSocketAddress resolved = remAddr.isUnresolved() ?
+            new 
InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), 
remAddr.getPort()) : remAddr;
 
         InetAddress addr = resolved.getAddress();
 
         assert addr != null;
 
+        sock.connect(resolved, 
(int)timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+        writeToSocket(sock, U.IGNITE_HEADER, 
timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+        return sock;
+    }
+
+    /**
+     * Creates socket binding it to a local host address. This operation is 
not blocking.
+     *
+     * @return Created socket.
+     * @throws IOException If failed.
+     */
+    Socket createSocket() throws IOException {
         Socket sock;
 
         if (isSslEnabled())
@@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, 
(int)timeoutHelper.nextTimeoutChunk(sockTimeout));
-
-        writeToSocket(sock, U.IGNITE_HEADER, 
timeoutHelper.nextTimeoutChunk(sockTimeout));
-
         return sock;
     }
 

Reply via email to