# IGNITE-709 Improve ping from client to server.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/505a03e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/505a03e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/505a03e9

Branch: refs/heads/ignite-836_2
Commit: 505a03e92db2747ed5beb07eb47cce17271d387c
Parents: c05e368
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue May 12 13:43:26 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue May 12 13:43:26 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 56 +++++++++++++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 62 ++++++++++++++++--
 .../messages/TcpDiscoveryClientPingRequest.java | 56 ++++++++++++++++
 .../TcpDiscoveryClientPingResponse.java         | 67 ++++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 30 +++++++++
 5 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 6752bf5..d55d1c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -75,6 +76,9 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new 
ConcurrentHashMap8<>();
 
+    /** Remote nodes. */
+    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = 
new ConcurrentHashMap8<>();
+
     /** Socket writer. */
     private SocketWriter sockWriter;
 
@@ -316,6 +320,9 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
             }
         }
 
+        for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+            fut.onDone(false);
+
         rmtNodes.clear();
 
         U.interrupt(sockTimeoutWorker);
@@ -359,15 +366,46 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
     }
 
     /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        assert nodeId != null;
+    @Override public boolean pingNode(@NotNull final UUID nodeId) {
+        if (getSpiContext().isStopping())
+            return false;
 
         if (nodeId.equals(getLocalNodeId()))
             return true;
 
         TcpDiscoveryNode node = rmtNodes.get(nodeId);
 
-        return node != null && node.visible();
+        if (node == null || !node.visible())
+            return false;
+
+        GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+        if (fut == null) {
+            fut = new GridFutureAdapter<>();
+
+            GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, 
fut);
+
+            if (oldFut != null)
+                fut = oldFut;
+            else
+                sockWriter.sendMessage(new 
TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+        }
+
+        final GridFutureAdapter<Boolean> finalFut = fut;
+
+        timer.schedule(new TimerTask() {
+            @Override public void run() {
+                if (pingFuts.remove(nodeId, finalFut))
+                    finalFut.onDone(false);
+            }
+        }, netTimeout);
+
+        try {
+            return fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e); // Should newer occur
+        }
     }
 
     /** {@inheritDoc} */
@@ -1069,6 +1107,8 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 
processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientPingResponse)
+                processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -1366,6 +1406,16 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         }
 
         /**
+         * @param msg Message.
+         */
+        private void processClientPingResponse(TcpDiscoveryClientPingResponse 
msg) {
+            GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+            if (fut != null)
+                fut.onDone(msg.result());
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 3624791..e00f798 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -203,6 +203,10 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private int reconCnt = DFLT_RECONNECT_CNT;
 
+    /** */
+    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 
10, 2000, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+
     /** Nodes ring. */
     @GridToStringExclude
     private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -285,6 +289,10 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     private final 
CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs 
=
         new CopyOnWriteArrayList<>();
 
+    /** */
+    private final CopyOnWriteArrayList<IgniteInClosure<Socket>> 
incomeConnLsnrs =
+        new CopyOnWriteArrayList<>();
+
     /** {@inheritDoc} */
     @IgniteInstanceResource
     @Override public void injectResources(Ignite ignite) {
@@ -2034,15 +2042,29 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void 
addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
-        sendMsgLsnrs.add(msg);
+    public void 
addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+        sendMsgLsnrs.add(lsnr);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     */
+    public void 
removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+        sendMsgLsnrs.remove(lsnr);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     */
+    public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+        incomeConnLsnrs.add(lsnr);
     }
 
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void 
removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
-        sendMsgLsnrs.remove(msg);
+    public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+        incomeConnLsnrs.remove(lsnr);
     }
 
     /**
@@ -2634,6 +2656,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
 
+            else if (msg instanceof TcpDiscoveryClientPingRequest)
+                processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+
             else
                 assert false : "Unknown message type: " + 
msg.getClass().getSimpleName();
 
@@ -4448,6 +4473,32 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * @param msg Message.
          */
+        private void processClientPingRequest(final 
TcpDiscoveryClientPingRequest msg) {
+            utilityPool.execute(new Runnable() {
+                @Override public void run() {
+                    boolean res = pingNode(msg.nodeToPing());
+
+                    final ClientMessageWorker worker = 
clientMsgWorkers.get(msg.creatorNodeId());
+
+                    if (worker == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Ping request from dead client node, 
will be skipped: " + msg.creatorNodeId());
+                    }
+                    else {
+                        TcpDiscoveryClientPingResponse pingRes = new 
TcpDiscoveryClientPingResponse(
+                            getLocalNodeId(), msg.nodeToPing(), res);
+
+                        pingRes.verify(getLocalNodeId());
+
+                        worker.addMessage(pingRes);
+                    }
+                }
+            });
+        }
+
+        /**
+         * @param msg Message.
+         */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
                 if (msg.verified()) {
@@ -4643,6 +4694,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                     sock.setSoTimeout((int)netTimeout);
 
+                    for (IgniteInClosure<Socket> connLsnr : incomeConnLsnrs)
+                        connLsnr.apply(sock);
+
                     in = new BufferedInputStream(sock.getInputStream());
 
                     byte[] buf = new byte[4];

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
new file mode 100644
index 0000000..f9f164d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage 
{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID 
nodeToPing) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", 
super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
new file mode 100644
index 0000000..26a2b00
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingResponse extends 
TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /** */
+    private final boolean res;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID 
nodeToPing, boolean res) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+        this.res = res;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /**
+     * @return Result of ping.
+     */
+    public boolean result() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", 
super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index a06bfd9..49ef4aa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -353,6 +353,36 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPingFailedNodeFromClient() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite srv0 = G.ignite("server-0");
+        Ignite srv1 = G.ignite("server-1");
+        Ignite client = G.ignite("client-0");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        
((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new
 IgniteInClosure<Socket>() {
+            @Override public void apply(Socket sock) {
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        assert 
((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+        assert 
!((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+
+        latch.countDown();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientReconnectOnRouterFail() throws Exception {
         clientsPerSrv = 1;
 

Reply via email to