# IGNITE-709 Improve ping from server to client.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a001312d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a001312d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a001312d

Branch: refs/heads/ignite-836_2
Commit: a001312d3a6b1175be4ef98e6113ea6eaf179246
Parents: 505a03e
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue May 12 16:30:34 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue May 12 16:30:34 2015 +0300

----------------------------------------------------------------------
 .../internal/util/future/SettableFuture.java    |  86 ++++++++++++
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  11 ++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 138 ++++++++++++++++---
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  37 ++++-
 4 files changed, 248 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
new file mode 100644
index 0000000..673b6b6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
@@ -0,0 +1,86 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.util.future;
+
+import java.util.concurrent.*;
+
+/**
+ * Simple implementation of {@link Future}
+ */
+public class SettableFuture<T> implements Future<T> {
+    /** */
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    /** Result of computation. */
+    private T res;
+
+    /** Exception threw during the computation. */
+    private ExecutionException err;
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel(boolean mayInterruptIfRunning) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCancelled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDone() {
+        return latch.getCount() == 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get() throws InterruptedException, ExecutionException {
+        latch.await();
+
+        if (err != null)
+            throw err;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException,
+        TimeoutException {
+
+        if (!latch.await(timeout, unit))
+            throw new TimeoutException();
+
+        if (err != null)
+            throw err;
+
+        return res;
+    }
+
+    /**
+     * Computation is done successful.
+     *
+     * @param res Result of computation.
+     */
+    public void set(T res) {
+        this.res = res;
+
+        latch.countDown();
+    }
+
+    /**
+     * Computation failed.
+     *
+     * @param throwable Error.
+     */
+    public void setException(Throwable throwable) {
+        err = new ExecutionException(throwable);
+
+        latch.countDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index d55d1c5..d1446a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -1109,6 +1109,8 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
             else if (msg instanceof TcpDiscoveryClientPingResponse)
                 processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
+            else if (msg instanceof TcpDiscoveryPingRequest)
+                processPingRequest((TcpDiscoveryPingRequest)msg);
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -1416,6 +1418,15 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         }
 
         /**
+         * Router want to ping this client.
+         *
+         * @param msg Message.
+         */
+        private void processPingRequest(TcpDiscoveryPingRequest msg) {
+            sockWriter.sendMessage(new 
TcpDiscoveryPingResponse(getLocalNodeId()));
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e00f798..f9c6130 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -51,6 +51,7 @@ import java.net.*;
 import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.IgniteNodeAttributes.*;
@@ -204,7 +205,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter 
implements TcpDiscov
     private int reconCnt = DFLT_RECONNECT_CNT;
 
     /** */
-    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 
10, 2000, TimeUnit.MILLISECONDS,
+    private final Executor utilityPool = new ThreadPoolExecutor(0, 10, 2000, 
TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
 
     /** Nodes ring. */
@@ -1143,8 +1144,28 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
         UUID locNodeId = getLocalNodeId();
 
-        if (F.contains(locNodeAddrs, addr))
-            return F.t(getLocalNodeId(), clientNodeId != null && 
clientMsgWorkers.containsKey(clientNodeId));
+        if (F.contains(locNodeAddrs, addr)) {
+            if (clientNodeId == null)
+                return F.t(getLocalNodeId(), false);
+
+            ClientMessageWorker clientWorker = 
clientMsgWorkers.get(clientNodeId);
+
+            if (clientWorker == null)
+                return F.t(getLocalNodeId(), false);
+
+            boolean clientPingRes;
+
+            try {
+                clientPingRes = clientWorker.ping();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedCheckedException(e);
+            }
+
+            return F.t(getLocalNodeId(), clientPingRes);
+        }
 
         GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new 
GridFutureAdapter<>();
 
@@ -2659,6 +2680,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             else if (msg instanceof TcpDiscoveryClientPingRequest)
                 processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
 
+            else if (msg instanceof TcpDiscoveryPingResponse)
+                processPingResponse((TcpDiscoveryPingResponse)msg);
+
             else
                 assert false : "Unknown message type: " + 
msg.getClass().getSimpleName();
 
@@ -4499,6 +4523,16 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * @param msg Message.
          */
+        private void processPingResponse(final TcpDiscoveryPingResponse msg) {
+            ClientMessageWorker clientWorker = 
clientMsgWorkers.get(msg.creatorNodeId());
+
+            if (clientWorker != null)
+                clientWorker.pingResult(true);
+        }
+
+        /**
+         * @param msg Message.
+         */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
                 if (msg.verified()) {
@@ -4660,9 +4694,6 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /** */
         private volatile UUID nodeId;
 
-        /** */
-        private volatile boolean client;
-
         /**
          * Constructor.
          *
@@ -4682,6 +4713,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         @Override protected void body() throws InterruptedException {
             UUID locNodeId = getLocalNodeId();
 
+            ClientMessageWorker clientMsgWrk = null;
+
             try {
                 InputStream in;
 
@@ -4746,8 +4779,12 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                             TcpDiscoveryPingResponse res = new 
TcpDiscoveryPingResponse(locNodeId);
 
-                            if (req.clientNodeId() != null)
-                                
res.clientExists(clientMsgWorkers.containsKey(req.clientNodeId()));
+                            if (req.clientNodeId() != null) {
+                                ClientMessageWorker clientWorker = 
clientMsgWorkers.get(req.clientNodeId());
+
+                                if (clientWorker != null)
+                                    res.clientExists(clientWorker.ping());
+                            }
 
                             writeToSocket(sock, res);
                         }
@@ -4761,10 +4798,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     TcpDiscoveryHandshakeRequest req = 
(TcpDiscoveryHandshakeRequest)msg;
 
                     UUID nodeId = req.creatorNodeId();
-                    boolean client = req.client();
 
                     this.nodeId = nodeId;
-                    this.client = client;
 
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, 
locNode.internalOrder());
@@ -4774,7 +4809,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     // It can happen if a remote node is stopped and it has a 
loopback address in the list of addresses,
                     // the local node sends a handshake request message on the 
loopback address, so we get here.
                     if (locNodeId.equals(nodeId)) {
-                        assert !client;
+                        assert !req.client();
 
                         if (log.isDebugEnabled())
                             log.debug("Handshake request from local node: " + 
req);
@@ -4782,12 +4817,12 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                         return;
                     }
 
-                    if (client) {
+                    if (req.client()) {
                         if (log.isDebugEnabled())
                             log.debug("Created client message worker 
[locNodeId=" + locNodeId +
                                 ", rmtNodeId=" + nodeId + ", sock=" + sock + 
']');
 
-                        ClientMessageWorker clientMsgWrk = new 
ClientMessageWorker(sock, nodeId);
+                        clientMsgWrk = new ClientMessageWorker(sock, nodeId);
 
                         clientMsgWrk.start();
 
@@ -4796,11 +4831,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                     if (log.isDebugEnabled())
                         log.debug("Initialized connection with remote node 
[nodeId=" + nodeId +
-                            ", client=" + client + ']');
+                            ", client=" + req.client() + ']');
 
                     if (debugMode)
                         debugLog("Initialized connection with remote node 
[nodeId=" + nodeId +
-                            ", client=" + client + ']');
+                            ", client=" + req.client() + ']');
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -4881,7 +4916,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                             if (!req.responded()) {
                                 boolean ok = processJoinRequestMessage(req);
 
-                                if (client && ok)
+                                if (clientMsgWrk != null && ok)
                                     continue;
                                 else
                                     // Direct join request - no need to handle 
this socket anymore.
@@ -4889,7 +4924,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                             }
                         }
                         else if (msg instanceof 
TcpDiscoveryClientReconnectMessage) {
-                            if (client) {
+                            if (clientMsgWrk != null) {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
@@ -5026,7 +5061,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                         msgWorker.addMessage(msg);
 
                         // Send receipt back.
-                        if (!client)
+                        if (clientMsgWrk == null)
                             writeToSocket(sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
@@ -5080,12 +5115,14 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 }
             }
             finally {
-                if (client) {
+                if (clientMsgWrk != null) {
                     if (log.isDebugEnabled())
                         log.debug("Client connection failed [sock=" + sock + 
", locNodeId=" + locNodeId +
                             ", rmtNodeId=" + nodeId + ']');
 
-                    U.interrupt(clientMsgWorkers.remove(nodeId));
+                    clientMsgWorkers.remove(nodeId, clientMsgWrk);
+
+                    U.interrupt(clientMsgWrk);
                 }
 
                 U.closeQuiet(sock);
@@ -5238,6 +5275,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
+        /** */
+        private final AtomicReference<SettableFuture<Boolean>> pingFut = new 
AtomicReference<>();
+
         /**
          * @param sock Socket.
          * @param nodeId Node ID.
@@ -5300,16 +5340,72 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 onException("Client connection failed [sock=" + sock + ", 
locNodeId="
                     + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + 
msg + ']', e);
 
-                U.interrupt(clientMsgWorkers.remove(nodeId));
+                clientMsgWorkers.remove(nodeId, this);
+
+                U.interrupt(this);
 
                 U.closeQuiet(sock);
             }
         }
 
+        /**
+         *
+         */
+        public void pingResult(boolean res) {
+            SettableFuture<Boolean> fut = pingFut.getAndSet(null);
+
+            if (fut != null)
+                fut.set(res);
+        }
+
+        /**
+         *
+         */
+        public boolean ping() throws InterruptedException {
+            if (isNodeStopping())
+                return false;
+
+            SettableFuture<Boolean> fut;
+
+            while (true) {
+                fut = pingFut.get();
+
+                if (fut != null)
+                    break;
+
+                fut = new SettableFuture<>();
+
+                if (pingFut.compareAndSet(null, fut)) {
+                    TcpDiscoveryPingRequest pingReq = new 
TcpDiscoveryPingRequest(getLocalNodeId(), nodeId);
+
+                    pingReq.verify(getLocalNodeId());
+
+                    addMessage(pingReq);
+
+                    break;
+                }
+            }
+
+            try {
+                return fut.get(ackTimeout, TimeUnit.MILLISECONDS);
+            }
+            catch (ExecutionException e) {
+                throw new IgniteSpiException("Internal error: ping future 
cannot be done with exception", e);
+            }
+            catch (TimeoutException ignored) {
+                if (pingFut.compareAndSet(fut, null))
+                    fut.set(false);
+
+                return false;
+            }
+        }
+
         /** {@inheritDoc} */
         @Override protected void cleanup() {
             super.cleanup();
 
+            pingResult(false);
+
             U.closeQuiet(sock);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 49ef4aa..507b3e7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -383,6 +383,30 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPingFailedClientNode() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite srv0 = G.ignite("server-0");
+        Ignite srv1 = G.ignite("server-1");
+        Ignite client = G.ignite("client-0");
+
+        
((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+
+        
((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite();
+
+        assert 
!((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+        assert 
!((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+
+        
((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll();
+
+        assert 
((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+        assert 
((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientReconnectOnRouterFail() throws Exception {
         clientsPerSrv = 1;
 
@@ -461,7 +485,7 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
 
         clientLeftLatch = new CountDownLatch(1);
 
-        
((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
+        
((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll();
 
         await(clientLeftLatch);
 
@@ -1042,7 +1066,14 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
         /**
          *
          */
-        private void pauseAll() {
+        public void pauseSocketWrite() {
+            pauseResumeOperation(true, writeLock);
+        }
+
+        /**
+         *
+         */
+        public void pauseAll() {
             pauseResumeOperation(true, openSockLock, writeLock);
 
             brokeConnection();
@@ -1051,7 +1082,7 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
         /**
          *
          */
-        private void resume() {
+        public void resumeAll() {
             pauseResumeOperation(false, openSockLock, writeLock);
         }
     }

Reply via email to