Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-426 37c3fe77e -> 96e338e49


ignite-1241-dev: fixed endless "failure detection threshold" warnings for the 
case when there is only one server and client nodes in the topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38070b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38070b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38070b28

Branch: refs/heads/ignite-426
Commit: 38070b28bdda9e95b125f27706037c9916edeeb6
Parents: 7760847
Author: Denis Magda <dma...@gridgain.com>
Authored: Fri Aug 14 16:20:18 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Fri Aug 14 16:20:18 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 26 ++++++++++++++------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  4 +--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  4 ---
 .../tcp/internal/TcpDiscoveryNode.java          | 18 +++++++-------
 .../tcp/internal/TcpDiscoveryNodesRing.java     | 23 +++++++++++++++++
 5 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38070b28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 76144e3..c8c4c50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -628,9 +628,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected void onDataReceived() {
+    @Override protected void onMessageExchanged() {
         if (spi.failureDetectionTimeoutEnabled() && locNode != null)
-            locNode.lastDataReceivedTime(U.currentTimeMillis());
+            locNode.lastExchangeTime(U.currentTimeMillis());
     }
 
     /**
@@ -1916,9 +1916,13 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (spi.ensured(msg))
                 msgHist.add(msg);
 
-            if (msg.senderNodeId() != null && 
!msg.senderNodeId().equals(getLocalNodeId()))
-                // Reset the flag.
+            if (msg.senderNodeId() != null && 
!msg.senderNodeId().equals(getLocalNodeId())) {
+                // Received a message from remote node.
+                onMessageExchanged();
+
+                // Reset the failure flag.
                 failureThresholdReached = false;
+            }
 
             spi.stats.onMessageProcessingFinished(msg);
         }
@@ -2278,6 +2282,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 int res = spi.readReceipt(sock, 
timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
+                                onMessageExchanged();
+
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next 
node [msg=" + msg +
                                         ", next=" + next.id() +
@@ -4104,9 +4110,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Check connection aliveness status.
          */
         private void checkConnection() {
+            Boolean hasRemoteSrvNodes = null;
+
             if (spi.failureDetectionTimeoutEnabled() && 
!failureThresholdReached &&
-                U.currentTimeMillis() - locNode.lastDataReceivedTime() >= 
connCheckThreshold &&
-                ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+                U.currentTimeMillis() - locNode.lastExchangeTime() >= 
connCheckThreshold &&
+                spiStateCopy() == CONNECTED &&
+                (hasRemoteSrvNodes = ring.hasRemoteServerNodes())) {
 
                 log.info("Local node seems to be disconnected from topology 
(failure detection timeout " +
                     "is reached): [failureDetectionTimeout=" + 
spi.failureDetectionTimeout() +
@@ -4123,7 +4132,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (elapsed > 0)
                 return;
 
-            if (ring.hasRemoteNodes()) {
+            if (hasRemoteSrvNodes == null)
+                hasRemoteSrvNodes = ring.hasRemoteServerNodes();
+
+            if (hasRemoteSrvNodes) {
                 sendMessageAcrossRing(new 
TcpDiscoveryConnectionCheckMessage(locNode));
 
                 lastTimeConnCheckMsgSent = U.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38070b28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e25f0b6..14d037d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -132,9 +132,9 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
-     * Called when a chunk of data is received from a remote node.
+     * Called when a local node either received from or sent to a remote node 
a message.
      */
-    protected void onDataReceived() {
+    protected void onMessageExchanged() {
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38070b28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18a540c..74dc36c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1371,8 +1371,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
 
             T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, 
U.gridClassLoader());
 
-            impl.onDataReceived();
-
             return res;
         }
         catch (IOException | IgniteCheckedException e) {
@@ -1414,8 +1412,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
             if (res == -1)
                 throw new EOFException();
 
-            impl.onDataReceived();
-
             return res;
         }
         catch (SocketTimeoutException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38070b28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 44e9006..135dc59 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -89,8 +89,8 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
     @GridToStringExclude
     private volatile long lastUpdateTime = U.currentTimeMillis();
 
-    /** The most recent time when a data chunk was received from a node. */
-    private volatile long lastDataReceivedTime = U.currentTimeMillis();
+    /** The most recent time when node exchanged a message with a remote node. 
*/
+    private volatile long lastExchangeTime = U.currentTimeMillis();
 
     /** Metrics provider (transient). */
     @GridToStringExclude
@@ -393,21 +393,21 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
     }
 
     /**
-     * Gets the last time a node received a data chunk from a remote node.
+     * Gets the last time a node exchanged a message with a remote node.
      *
      * @return Time in milliseconds.
      */
-    public long lastDataReceivedTime() {
-        return lastDataReceivedTime;
+    public long lastExchangeTime() {
+        return lastExchangeTime;
     }
 
     /**
-     * Sets the last time a node receive a data chunk from a remote node in a 
topology.
+     * Sets the last time a node exchanged a message with a remote node.
      *
-     * @param lastDataReceivedTime Time in milliseconds.
+     * @param lastExchangeTime Time in milliseconds.
      */
-    public void lastDataReceivedTime(long lastDataReceivedTime) {
-        this.lastDataReceivedTime = lastDataReceivedTime;
+    public void lastExchangeTime(long lastExchangeTime) {
+        this.lastExchangeTime = lastExchangeTime;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38070b28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index acb479d..2422e14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -152,6 +152,29 @@ public class TcpDiscoveryNodesRing {
     }
 
     /**
+     * Checks whether the topology has remote server nodes in.
+     *
+     * @return {@code true} if the topology has remote server nodes in.
+     */
+    public boolean hasRemoteServerNodes() {
+        rwLock.readLock().lock();
+
+        try {
+            if (nodes.size() < 2)
+                return false;
+
+            for (TcpDiscoveryNode node : nodes)
+                if (!node.isClient() && !node.id().equals(locNode.id()))
+                    return true;
+
+            return false;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
      * Adds node to topology, also initializes node last update time with 
current
      * system time.
      *

Reply via email to