ignite-366 Metrics for caches should work in clustered mode

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6e5a73db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6e5a73db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6e5a73db

Branch: refs/heads/ignite-366
Commit: 6e5a73db08a8589ecad88acc810da28a176ffaf1
Parents: e1540d5
Author: agura <ag...@gridgain.com>
Authored: Wed Apr 8 02:57:40 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Apr 8 02:57:40 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 14 +++--
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 45 +++++---------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 62 ++++++++------------
 .../tcp/internal/TcpDiscoveryNode.java          | 12 ++--
 .../CacheMetricsForClusterGroupSelfTest.java    | 14 +++--
 5 files changed, 69 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 88e3cc9..fe61a91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -654,13 +654,19 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 if (F.isEmpty(caches))
                     return Collections.emptyMap();
 
-                Map<Integer, CacheMetrics> metrics = 
U.newHashMap(caches.size());
+                Map<Integer, CacheMetrics> metrics = null;
+
+                for (GridCacheAdapter<?, ?> cache : caches) {
+                    if (cache.configuration().isStatisticsEnabled()) {
+                        if (metrics == null) {
+                            metrics = U.newHashMap(caches.size());
+                        }
 
-                for (GridCacheAdapter<?, ?> cache : caches)
-                    if (cache.configuration().isStatisticsEnabled())
                         metrics.put(cache.context().cacheId(), 
cache.metrics());
+                    }
+                }
 
-                return metrics;
+                return metrics == null ? Collections.<Integer, 
CacheMetrics>emptyMap() : metrics;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index e548488..5d8a285 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -1103,19 +1103,19 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
 
                 if (msg.hasMetrics()) {
                     for (Map.Entry<UUID, MetricsSet> e : 
msg.metrics().entrySet()) {
+                        UUID nodeId = e.getKey();
+
                         MetricsSet metricsSet = e.getValue();
 
-                        updateMetrics(e.getKey(), metricsSet.metrics(), 
tstamp);
+                        Map<Integer, CacheMetrics> cacheMetrics = 
msg.hasCacheMetrics() ?
+                                msg.cacheMetrics().get(nodeId) : 
Collections.<Integer, CacheMetrics>emptyMap();
+
+                        updateMetrics(nodeId, metricsSet.metrics(), 
cacheMetrics, tstamp);
 
                         for (T2<UUID, ClusterMetrics> t : 
metricsSet.clientMetrics())
-                            updateMetrics(t.get1(), t.get2(), tstamp);
+                            updateMetrics(t.get1(), t.get2(), cacheMetrics, 
tstamp);
                     }
                 }
-
-                if (msg.hasCacheMetrics()) {
-                    for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
-                        updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
-                }
             }
         }
 
@@ -1161,37 +1161,22 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
+         * @param cacheMetrics Cache metrics.
          * @param tstamp Timestamp.
          */
-        private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long 
tstamp) {
+        private void updateMetrics(UUID nodeId,
+            ClusterMetrics metrics,
+            Map<Integer, CacheMetrics> cacheMetrics,
+            long tstamp)
+        {
             assert nodeId != null;
             assert metrics != null;
+            assert cacheMetrics != null;
 
             TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode 
: rmtNodes.get(nodeId);
 
             if (node != null && node.visible()) {
                 node.setMetrics(metrics);
-
-                node.lastUpdateTime(tstamp);
-
-                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, 
allNodes());
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Received metrics from unknown node: " + nodeId);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param cacheMetrics Cache metrics.
-         * @param tstamp Timestamp.
-         */
-        private void updateCacheMetrics(UUID nodeId, Map<Integer, 
CacheMetrics> cacheMetrics, long tstamp) {
-            assert nodeId != null;
-            assert cacheMetrics != null;
-
-            TcpDiscoveryNode node = 
nodeId.equals(ignite.configuration().getNodeId()) ? locNode : 
rmtNodes.get(nodeId);
-
-            if (node != null && node.visible()) {
                 node.setCacheMetrics(cacheMetrics);
 
                 node.lastUpdateTime(tstamp);
@@ -1199,7 +1184,7 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, 
allNodes());
             }
             else if (log.isDebugEnabled())
-                log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
+                log.debug("Received metrics from unknown node: " + nodeId);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18d6194..1ce59ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4253,19 +4253,19 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             if (spiStateCopy() == CONNECTED) {
                 if (msg.hasMetrics()) {
                     for (Map.Entry<UUID, MetricsSet> e : 
msg.metrics().entrySet()) {
+                        UUID nodeId = e.getKey();
+
                         MetricsSet metricsSet = e.getValue();
 
-                        updateMetrics(e.getKey(), metricsSet.metrics(), 
tstamp);
+                        Map<Integer, CacheMetrics> cacheMetrics = 
msg.hasCacheMetrics() ?
+                                msg.cacheMetrics().get(nodeId) : 
Collections.<Integer, CacheMetrics>emptyMap();
+
+                        updateMetrics(nodeId, metricsSet.metrics(), 
cacheMetrics, tstamp);
 
                         for (T2<UUID, ClusterMetrics> t : 
metricsSet.clientMetrics())
-                            updateMetrics(t.get1(), t.get2(), tstamp);
+                            updateMetrics(t.get1(), t.get2(), cacheMetrics, 
tstamp);
                     }
                 }
-
-                if (msg.hasCacheMetrics()) {
-                    for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
-                        updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
-                }
             }
 
             if (ring.hasRemoteNodes()) {
@@ -4273,7 +4273,6 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     !hasMetrics(msg, locNodeId)) && spiStateCopy() == 
CONNECTED) {
                     // Message is on its first ring or just created on 
coordinator.
                     msg.setMetrics(locNodeId, metricsProvider.metrics());
-
                     msg.setCacheMetrics(locNodeId, 
metricsProvider.cacheMetrics());
 
                     for (Map.Entry<UUID, ClientMessageWorker> e : 
clientMsgWorkers.entrySet()) {
@@ -4288,9 +4287,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 }
                 else {
                     // Message is on its second ring.
-                    msg.removeMetrics(locNodeId);
-
-                    msg.removeCacheMetrics(locNodeId);
+                    removeMetrics(msg, locNodeId);
 
                     Collection<UUID> clientNodeIds = msg.clientNodeIds();
 
@@ -4323,37 +4320,22 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
+         * @param cacheMetrics Cache metrics.
          * @param tstamp Timestamp.
          */
-        private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long 
tstamp) {
+        private void updateMetrics(UUID nodeId,
+            ClusterMetrics metrics,
+            Map<Integer, CacheMetrics> cacheMetrics,
+            long tstamp)
+        {
             assert nodeId != null;
             assert metrics != null;
+            assert cacheMetrics != null;
 
             TcpDiscoveryNode node = ring.node(nodeId);
 
             if (node != null) {
                 node.setMetrics(metrics);
-
-                node.lastUpdateTime(tstamp);
-
-                notifyDiscovery(EVT_NODE_METRICS_UPDATED, 
ring.topologyVersion(), node);
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Received metrics from unknown node: " + nodeId);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param cacheMetrics Cache metrics.
-         * @param tstamp Timestamp.
-         */
-        private void updateCacheMetrics(UUID nodeId, Map<Integer, 
CacheMetrics> cacheMetrics, long tstamp) {
-            assert nodeId != null;
-            assert cacheMetrics != null;
-
-            TcpDiscoveryNode node = ring.node(nodeId);
-
-            if (node != null && node.visible()) {
                 node.setCacheMetrics(cacheMetrics);
 
                 node.lastUpdateTime(tstamp);
@@ -4361,7 +4343,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 notifyDiscovery(EVT_NODE_METRICS_UPDATED, 
ring.topologyVersion(), node);
             }
             else if (log.isDebugEnabled())
-                log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
+                log.debug("Received metrics from unknown node: " + nodeId);
         }
 
         /**
@@ -5153,8 +5135,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 if (hbMsg.creatorNodeId().equals(nodeId)) {
                     metrics = hbMsg.metrics().get(nodeId).metrics();
 
-                    hbMsg.removeMetrics(nodeId);
-                    hbMsg.removeCacheMetrics(nodeId);
+                    removeMetrics(hbMsg, nodeId);
 
                     assert !hbMsg.hasMetrics();
                     assert !hbMsg.hasCacheMetrics();
@@ -5203,4 +5184,13 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             U.closeQuiet(sock);
         }
     }
+
+    /**
+     * @param msg Message.
+     * @param nodeId Node ID.
+     */
+    private void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+        msg.removeMetrics(nodeId);
+        msg.removeCacheMetrics(nodeId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 9be1207..7fe7b40 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -197,10 +197,12 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
 
     /** {@inheritDoc} */
     @Override public ClusterMetrics metrics() {
+        ClusterMetrics metrics0 = null;
+
         if (metricsProvider != null)
-            metrics = metricsProvider.metrics();
+            metrics = metrics0 = metricsProvider.metrics();
 
-        return metrics;
+        return metrics0 == null ? metrics : metrics0;
     }
 
     /**
@@ -225,10 +227,12 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
      * @return Runtime metrics snapshots for this node.
      */
     public Map<Integer, CacheMetrics> cacheMetrics() {
+        Map<Integer, CacheMetrics> cacheMetrics0 = null;
+
         if (metricsProvider != null)
-            cacheMetrics = metricsProvider.cacheMetrics();
+            cacheMetrics = cacheMetrics0 = metricsProvider.cacheMetrics();
 
-        return cacheMetrics;
+        return cacheMetrics0 == null ? cacheMetrics : cacheMetrics0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
index 9ebcf46..ddbdb2d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
@@ -73,8 +73,11 @@ public class CacheMetricsForClusterGroupSelfTest extends 
GridCommonAbstractTest
 
         Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
 
-        for (ClusterNode node : nodes)
-            assertNotNull(((TcpDiscoveryNode)node).cacheMetrics());
+        for (ClusterNode node : nodes) {
+            Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode) 
node).cacheMetrics();
+            assertNotNull(metrics);
+            assertFalse(metrics.isEmpty());
+        }
 
         assertMetrics(CACHE1);
         assertMetrics(CACHE2);
@@ -97,8 +100,11 @@ public class CacheMetricsForClusterGroupSelfTest extends 
GridCommonAbstractTest
 
         Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
 
-        for (ClusterNode node : nodes)
-            assertNull(((TcpDiscoveryNode)node).cacheMetrics());
+        for (ClusterNode node : nodes) {
+            Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode) 
node).cacheMetrics();
+            assertNotNull(metrics);
+            assertTrue(metrics.isEmpty());
+        }
     }
 
     /**

Reply via email to