ignite-366 Metrics for caches should work in clustered mode

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b73fb49c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b73fb49c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b73fb49c

Branch: refs/heads/ignite-366
Commit: b73fb49c3ebdf1c552b12e84e492a3db21e4d52a
Parents: b4c51f4
Author: Andrey Gura <ag...@gridgain.com>
Authored: Thu Apr 2 22:10:23 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Thu Apr 2 22:18:39 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   8 +
 .../org/apache/ignite/cluster/ClusterNode.java  |   2 +-
 .../discovery/GridDiscoveryManager.java         |   8 +-
 .../processors/cache/CacheMetricsSnapshot.java  | 269 ++++++++++++++++++-
 .../processors/cache/IgniteCacheProxy.java      |  15 ++
 .../spi/discovery/DiscoveryMetricsProvider.java |   2 +-
 .../discovery/tcp/TcpClientDiscoverySpi.java    |   5 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  45 +++-
 .../tcp/internal/TcpDiscoveryNode.java          |   6 +-
 .../messages/TcpDiscoveryHeartbeatMessage.java  |  41 ++-
 .../ignite/internal/GridDiscoverySelfTest.java  |   2 +-
 ...hePartitionedMetricsForClusterGroupTest.java |  72 +++++
 .../ignite/p2p/GridP2PClassLoadingSelfTest.java |   2 +-
 .../ignite/testframework/GridTestNode.java      |   4 +-
 .../junits/spi/GridSpiAbstractTest.java         |   4 +-
 15 files changed, 459 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 5c5bb25..d133ab7 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.affinity.rendezvous.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
@@ -514,6 +515,13 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
     public CacheMetrics metrics();
 
     /**
+     * Gets snapshot metrics for caches in cluster group.
+     * @param grp Cluster group.
+     * @return Cache metrics.
+     */
+    public CacheMetrics metrics(ClusterGroup grp);
+
+    /**
      * Gets MxBean for this cache.
      *
      * @return MxBean.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java 
b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 1c2e829..bc1636e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -159,7 +159,7 @@ public interface ClusterNode {
      *
      * @return Runtime metrics snapshots for this node.
      */
-    public Collection<CacheMetrics> cacheMetrics();
+    public Map<String, CacheMetrics> cacheMetrics();
 
     /**
      * Gets all node attributes. Attributes are assigned to nodes at startup

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 5fb8616..76afa73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -648,16 +648,16 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             }
 
             /** {@inheritDoc} */
-            @Override public Collection<CacheMetrics> cacheMetrics() {
+            @Override public Map<String, CacheMetrics> cacheMetrics() {
                 Collection<GridCache<?, ?>> caches = ctx.cache().caches();
 
                 if (F.isEmpty(caches))
-                    return Collections.emptyList();
+                    return Collections.emptyMap();
 
-                List<CacheMetrics> metrics = new ArrayList<>(caches.size());
+                Map<String, CacheMetrics> metrics = 
U.newHashMap(caches.size());
 
                 for (GridCache<?, ?> cache : caches)
-                    metrics.add(cache.metrics());
+                    metrics.put(cache.name(), cache.metrics());
 
                 return metrics;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index e20259f..4e6a256 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
+import java.util.*;
+
 /**
  * Metrics snapshot.
  */
-public class CacheMetricsSnapshot implements CacheMetrics {
+public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
+    /** Last update time. */
+    private long lastUpdateTime = -1;
+
     /** Number of reads. */
     private long reads = 0;
 
@@ -246,6 +252,131 @@ public class CacheMetricsSnapshot implements CacheMetrics 
{
         isWriteThrough = m.isWriteThrough();
     }
 
+    /**
+     * Constructs merged cachemetrics.
+     *
+     * @param metrics Metrics for merge.
+     */
+    public CacheMetricsSnapshot(Collection<CacheMetrics> metrics) {
+        A.notEmpty(metrics, "metrics");
+
+        // TODO: Only once
+        /*
+        cacheName
+        isEmpty
+        isWriteBehindEnabled
+        writeBehindFlushSize
+        writeBehindFlushThreadCount
+        writeBehindFlushFrequency
+        writeBehindStoreBatchSize
+        writeBehindTotalCriticalOverflowCount
+        writeBehindCriticalOverflowCount
+        writeBehindErrorRetryCount
+        writeBehindBufferSize
+
+        keyType
+        valueType
+        isStoreByValue
+        isStatisticsEnabled
+        isManagementEnabled
+        isReadThrough
+        isWriteThrough
+         */
+
+        //TODO: How to merge?
+        /*
+        overflowSize
+        size (only once PRIMARY?)
+         */
+
+        boolean first = true;
+
+        for (CacheMetrics e : metrics) {
+
+            if (first) {
+                cacheName = e.name();
+                isEmpty = e.isEmpty();
+                isWriteBehindEnabled = e.isWriteBehindEnabled();
+                writeBehindFlushSize = e.getWriteBehindFlushSize();
+                writeBehindFlushThreadCount = 
e.getWriteBehindFlushThreadCount();
+                writeBehindFlushFrequency = e.getWriteBehindFlushFrequency();
+                writeBehindStoreBatchSize = e.getWriteBehindStoreBatchSize();
+                writeBehindTotalCriticalOverflowCount = 
e.getWriteBehindTotalCriticalOverflowCount();
+                writeBehindCriticalOverflowCount = 
e.getWriteBehindCriticalOverflowCount();
+                writeBehindErrorRetryCount = e.getWriteBehindErrorRetryCount();
+                writeBehindBufferSize = e.getWriteBehindBufferSize();
+
+                keyType = e.getKeyType();
+                valueType = e.getValueType();
+                isStoreByValue = e.isStoreByValue();
+                isStatisticsEnabled = e.isStatisticsEnabled();
+                isManagementEnabled = e.isManagementEnabled();
+                isReadThrough = e.isReadThrough();
+                isWriteThrough = e.isWriteThrough();
+
+                first = false;
+            }
+
+            reads += e.getCacheGets();
+            puts += e.getCachePuts();
+            hits += e.getCacheHits();
+            misses += e.getCacheHits();
+            txCommits += e.getCacheTxCommits();
+            txRollbacks += e.getCacheTxRollbacks();
+            evicts += e.getCacheEvictions();
+            removes += e.getCacheRemovals();
+
+            putAvgTimeNanos += e.getAveragePutTime();
+            getAvgTimeNanos += e.getAverageGetTime();
+            removeAvgTimeNanos += e.getAverageRemoveTime();
+            commitAvgTimeNanos += e.getAverageTxCommitTime();
+            rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
+
+            offHeapEntriesCount += e.getOffHeapEntriesCount();
+            offHeapAllocatedSize += e.getOffHeapAllocatedSize();
+            keySize += e.getKeySize();
+            dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+            txThreadMapSize += e.getTxThreadMapSize();
+            txXidMapSize += e.getTxXidMapSize();
+            txCommitQueueSize += e.getTxCommitQueueSize();
+            txPrepareQueueSize += e.getTxPrepareQueueSize();
+            txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+            txCommittedVersionsSize += e.getTxCommittedVersionsSize();
+            txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
+            txDhtThreadMapSize += e.getTxDhtThreadMapSize();
+            txDhtXidMapSize += e.getTxDhtXidMapSize();
+            txDhtCommitQueueSize += e.getTxDhtCommitQueueSize();
+            txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize();
+            txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize();
+            txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize();
+            txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize();
+        }
+
+        putAvgTimeNanos /= metrics.size();
+        getAvgTimeNanos /= metrics.size();
+        removeAvgTimeNanos /= metrics.size();
+        commitAvgTimeNanos /= metrics.size();
+        rollbackAvgTimeNanos /= metrics.size();
+    }
+
+    /**
+     * Gets last update time of this node metrics.
+     *
+     * @return Last update time.
+     */
+    public long getLastUpdateTime() {
+        return lastUpdateTime;
+    }
+
+    /**
+     * Sets last update time.
+     *
+     * @param lastUpdateTime Last update time.
+     */
+    public void setLastUpdateTime(long lastUpdateTime) {
+        this.lastUpdateTime = lastUpdateTime;
+    }
+
     /** {@inheritDoc} */
     @Override public long getCacheHits() {
         return hits;
@@ -266,9 +397,8 @@ public class CacheMetricsSnapshot implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public float getCacheMissPercentage() {
-        if (misses == 0 || reads == 0) {
+        if (misses == 0 || reads == 0)
             return 0;
-        }
 
         return (float) misses / reads * 100.0f;
     }
@@ -283,6 +413,14 @@ public class CacheMetricsSnapshot implements CacheMetrics {
         return puts;
     }
 
+    /**
+     * Sets the total number of puts to the cache.
+     * @param puts The total number of puts to the cache.
+     */
+    public void setCachePuts(long puts) {
+        this.puts = puts;
+    }
+
     /** {@inheritDoc} */
     @Override public long getCacheRemovals() {
         return removes;
@@ -303,6 +441,15 @@ public class CacheMetricsSnapshot implements CacheMetrics {
         return putAvgTimeNanos;
     }
 
+    /**
+     * Sets the mean time to execute puts.
+     *
+     * @param putAvgTimeNanos The time in µs.
+     */
+    public void setAveragePutTime(float putAvgTimeNanos) {
+        this.putAvgTimeNanos = putAvgTimeNanos;
+    }
+
     /** {@inheritDoc} */
     @Override public float getAverageRemoveTime() {
         return removeAvgTimeNanos;
@@ -522,4 +669,120 @@ public class CacheMetricsSnapshot implements CacheMetrics 
{
     @Override public String toString() {
         return S.toString(CacheMetricsSnapshot.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(reads);
+        out.writeLong(puts);
+        out.writeLong(hits);
+        out.writeLong(misses);
+        out.writeLong(txCommits);
+        out.writeLong(txRollbacks);
+        out.writeLong(evicts);
+        out.writeLong(removes);
+
+        out.writeFloat(putAvgTimeNanos);
+        out.writeFloat(getAvgTimeNanos);
+        out.writeFloat(removeAvgTimeNanos);
+        out.writeFloat(commitAvgTimeNanos);
+        out.writeFloat(rollbackAvgTimeNanos);
+
+        U.writeUTFStringNullable(out, cacheName);
+        out.writeLong(overflowSize);
+        out.writeLong(offHeapEntriesCount);
+        out.writeLong(offHeapAllocatedSize);
+        out.writeInt(size);
+        out.writeInt(keySize);
+        out.writeBoolean(isEmpty);
+        out.writeInt(dhtEvictQueueCurrentSize);
+        out.writeInt(txThreadMapSize);
+        out.writeInt(txXidMapSize);
+        out.writeInt(txCommitQueueSize);
+        out.writeInt(txPrepareQueueSize);
+        out.writeInt(txStartVersionCountsSize);
+        out.writeInt(txCommittedVersionsSize);
+        out.writeInt(txRolledbackVersionsSize);
+        out.writeInt(txDhtThreadMapSize);
+        out.writeInt(txDhtXidMapSize);
+        out.writeInt(txDhtCommitQueueSize);
+        out.writeInt(txDhtPrepareQueueSize);
+        out.writeInt(txDhtStartVersionCountsSize);
+        out.writeInt(txDhtCommittedVersionsSize);
+        out.writeInt(txDhtRolledbackVersionsSize);
+        out.writeBoolean(isWriteBehindEnabled);
+        out.writeInt(writeBehindFlushSize);
+        out.writeInt(writeBehindFlushThreadCount);
+        out.writeLong(writeBehindFlushFrequency);
+        out.writeInt(writeBehindStoreBatchSize);
+        out.writeInt(writeBehindTotalCriticalOverflowCount);
+        out.writeInt(writeBehindCriticalOverflowCount);
+        out.writeInt(writeBehindErrorRetryCount);
+        out.writeInt(writeBehindBufferSize);
+
+        out.writeUTF(keyType);
+        out.writeUTF(valueType);
+        out.writeBoolean(isStoreByValue);
+        out.writeBoolean(isStatisticsEnabled);
+        out.writeBoolean(isManagementEnabled);
+        out.writeBoolean(isReadThrough);
+        out.writeBoolean(isWriteThrough);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        reads = in.readLong();
+        puts = in.readLong();
+        hits = in.readLong();
+        misses = in.readLong();
+        txCommits = in.readLong();
+        txRollbacks = in.readLong();
+        evicts = in.readLong();
+        removes = in.readLong();
+
+        putAvgTimeNanos = in.readFloat();
+        getAvgTimeNanos = in.readFloat();
+        removeAvgTimeNanos = in.readFloat();
+        commitAvgTimeNanos = in.readFloat();
+        rollbackAvgTimeNanos = in.readFloat();
+
+        cacheName = U.readUTFStringNullable(in);
+        overflowSize = in.readLong();
+        offHeapEntriesCount = in.readLong();
+        offHeapAllocatedSize = in.readLong();
+        size = in.readInt();
+        keySize = in.readInt();
+        isEmpty = in.readBoolean();
+        dhtEvictQueueCurrentSize = in.readInt();
+        txThreadMapSize = in.readInt();
+        txXidMapSize = in.readInt();
+        txCommitQueueSize = in.readInt();
+        txPrepareQueueSize = in.readInt();
+        txStartVersionCountsSize = in.readInt();
+        txCommittedVersionsSize = in.readInt();
+        txRolledbackVersionsSize = in.readInt();
+        txDhtThreadMapSize = in.readInt();
+        txDhtXidMapSize = in.readInt();
+        txDhtCommitQueueSize = in.readInt();
+        txDhtPrepareQueueSize = in.readInt();
+        txDhtStartVersionCountsSize = in.readInt();
+        txDhtCommittedVersionsSize = in.readInt();
+        txDhtRolledbackVersionsSize = in.readInt();
+        isWriteBehindEnabled = in.readBoolean();
+        writeBehindFlushSize = in.readInt();
+        writeBehindFlushThreadCount = in.readInt();
+        writeBehindFlushFrequency = in.readLong();
+        writeBehindStoreBatchSize = in.readInt();
+        writeBehindTotalCriticalOverflowCount = in.readInt();
+        writeBehindCriticalOverflowCount = in.readInt();
+        writeBehindErrorRetryCount = in.readInt();
+        writeBehindBufferSize = in.readInt();
+
+        keyType = in.readUTF();
+        valueType = in.readUTF();
+        isStoreByValue = in.readBoolean();
+        isStatisticsEnabled = in.readBoolean();
+        isManagementEnabled = in.readBoolean();
+        isReadThrough = in.readBoolean();
+        isWriteThrough = in.readBoolean();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index dfc3ef4..6b7d709 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -140,6 +140,21 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public CacheMetrics metrics(ClusterGroup grp) {
+        List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
+
+        for (ClusterNode node : grp.nodes())
+            metrics.add(node.cacheMetrics().get(getName()));
+
+        if (F.isEmpty(metrics)) {
+            //TODO: what actually need to be returned?
+            return new CacheMetricsSnapshot();
+        }
+
+        return new CacheMetricsSnapshot(metrics);
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheMetricsMXBean mxBean() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
index f3c98de..eb1deb9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
@@ -45,5 +45,5 @@ public interface DiscoveryMetricsProvider {
      *
      * @return metrics data about all caches on local node.
      */
-    public Collection<CacheMetrics> cacheMetrics();
+    public Map<String, CacheMetrics> cacheMetrics();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index a3bff02..9cfee1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -1113,7 +1113,7 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 }
 
                 if (msg.hasCacheMetrics()) {
-                    for (Map.Entry<UUID,Collection<CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
+                    for (Map.Entry<UUID, Map<String, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
                         updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
                 }
             }
@@ -1185,7 +1185,7 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
          * @param cacheMetrics Cache metrics.
          * @param tstamp Timestamp.
          */
-        private void updateCacheMetrics(UUID nodeId, Collection<CacheMetrics> 
cacheMetrics, long tstamp) {
+        private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> 
cacheMetrics, long tstamp) {
             assert nodeId != null;
             assert cacheMetrics != null;
 
@@ -1202,7 +1202,6 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
         }
 
-
         /**
          * @param topVer New topology version.
          * @return Latest topology snapshot.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d5c05f7..856ede2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -4351,7 +4352,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 return;
             }
 
-            if (locNodeId.equals(msg.creatorNodeId()) && 
!msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) {
+            if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, 
locNodeId) && msg.senderNodeId() != null) {
                 if (log.isDebugEnabled())
                     log.debug("Discarding heartbeat message that has made two 
passes: " + msg);
 
@@ -4371,14 +4372,21 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                             updateMetrics(t.get1(), t.get2(), tstamp);
                     }
                 }
+
+                if (msg.hasCacheMetrics()) {
+                    for (Map.Entry<UUID, Map<String, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
+                        updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
+                }
             }
 
             if (ring.hasRemoteNodes()) {
                 if ((locNodeId.equals(msg.creatorNodeId()) && 
msg.senderNodeId() == null ||
-                    !msg.hasMetrics(locNodeId)) && spiStateCopy() == 
CONNECTED) {
+                    !hasMetrics(msg, locNodeId)) && spiStateCopy() == 
CONNECTED) {
                     // Message is on its first ring or just created on 
coordinator.
                     msg.setMetrics(locNodeId, metricsProvider.metrics());
 
+                    msg.setCacheMetrics(locNodeId, 
metricsProvider.cacheMetrics());
+
                     for (Map.Entry<UUID, ClientMessageWorker> e : 
clientMsgWorkers.entrySet()) {
                         UUID nodeId = e.getKey();
                         ClusterMetrics metrics = e.getValue().metrics();
@@ -4393,6 +4401,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     // Message is on its second ring.
                     msg.removeMetrics(locNodeId);
 
+                    msg.removeCacheMetrics(locNodeId);
+
                     Collection<UUID> clientNodeIds = msg.clientNodeIds();
 
                     for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
@@ -4444,6 +4454,35 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         }
 
         /**
+         * @param nodeId Node ID.
+         * @param cacheMetrics Cache metrics.
+         * @param tstamp Timestamp.
+         */
+        private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> 
cacheMetrics, long tstamp) {
+            assert nodeId != null;
+            assert cacheMetrics != null;
+
+            TcpDiscoveryNode node = ring.node(nodeId);
+
+            if (node != null && node.visible()) {
+                node.setCacheMetrics(cacheMetrics);
+
+                node.lastUpdateTime(tstamp);
+
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, 
ring.topologyVersion(), node);
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID 
nodeId) {
+            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
+        }
+
+        /**
          * Processes discard message and discards previously registered 
pending messages.
          *
          * @param msg Discard message.
@@ -5226,8 +5265,10 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     metrics = hbMsg.metrics().get(nodeId).metrics();
 
                     hbMsg.removeMetrics(nodeId);
+                    hbMsg.removeCacheMetrics(nodeId);
 
                     assert !hbMsg.hasMetrics();
+                    assert !hbMsg.hasCacheMetrics();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 89924fb..7c4d17b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -76,7 +76,7 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
 
     /** Node cache metrics. */
     @GridToStringExclude
-    private volatile Collection<CacheMetrics> cacheMetrics;
+    private volatile Map<String, CacheMetrics> cacheMetrics;
 
     /** Node order in the topology. */
     private volatile long order;
@@ -215,7 +215,7 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<CacheMetrics> cacheMetrics() {
+    @Override public Map<String, CacheMetrics> cacheMetrics() {
         if (metricsProvider != null)
             cacheMetrics = metricsProvider.cacheMetrics();
 
@@ -227,7 +227,7 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
      *
      * @param cacheMetrics Cache metrics.
      */
-    public void setCacheMetrics(Collection<CacheMetrics> cacheMetrics) {
+    public void setCacheMetrics(Map<String, CacheMetrics> cacheMetrics) {
         assert cacheMetrics != null;
 
         this.cacheMetrics = cacheMetrics;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index 095edb7..c1e434e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -55,7 +56,7 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
 
     /** Cahce metrics by node. */
     @GridToStringExclude
-    private Map<UUID, Collection<CacheMetrics>> cacheMetrics;
+    private Map<UUID, Map<String, CacheMetrics>> cacheMetrics;
 
     /**
      * Public default no-arg constructor for {@link Externalizable} interface.
@@ -97,7 +98,7 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
      * @param nodeId Node ID.
      * @param metrics Node cache metrics.
      */
-    public void setCacheMetrics(UUID nodeId, Collection<CacheMetrics> metrics) 
{
+    public void setCacheMetrics(UUID nodeId, Map<String, CacheMetrics> 
metrics) {
         assert nodeId != null;
         assert metrics != null;
         assert !this.cacheMetrics.containsKey(nodeId);
@@ -157,7 +158,7 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
      *
      * @return Cache metrics map.
      */
-    public Map<UUID, Collection<CacheMetrics>> cacheMetrics() {
+    public Map<UUID, Map<String, CacheMetrics>> cacheMetrics() {
         return cacheMetrics;
     }
 
@@ -226,6 +227,21 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
             }
         }
 
+        out.writeInt(cacheMetrics.size());
+
+        if (!cacheMetrics.isEmpty()) {
+            for (Map.Entry<UUID, Map<String, CacheMetrics>> e : 
cacheMetrics.entrySet()) {
+                U.writeUuid(out, e.getKey());
+
+                Map<String, CacheMetrics> ms = e.getValue();
+
+                out.writeInt(ms == null ? 0 : ms.size());
+
+                for (Map.Entry<String, CacheMetrics> m : ms.entrySet())
+                    out.writeObject(m.getValue());
+            }
+        }
+
         U.writeCollection(out, clientNodeIds);
     }
 
@@ -240,6 +256,25 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
         for (int i = 0; i < metricsSize; i++)
             metrics.put(U.readUuid(in), (MetricsSet)in.readObject());
 
+        int cacheMetricsSize = in.readInt();
+
+        cacheMetrics = U.newHashMap(cacheMetricsSize);
+
+        for (int i = 0; i < cacheMetricsSize; i++) {
+            UUID uuid = U.readUuid(in);
+
+            int size = in.readInt();
+
+            Map<String, CacheMetrics> ms = U.newHashMap(size);
+
+            for (int j = 0; j < size; j++) {
+                CacheMetricsSnapshot m = (CacheMetricsSnapshot) 
in.readObject();
+                ms.put(m.name(), m);
+            }
+
+            cacheMetrics.put(uuid, ms);
+        }
+
         clientNodeIds = U.readCollection(in);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index 3c2ccbb..5f47d5e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -376,7 +376,7 @@ public class GridDiscoverySelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<CacheMetrics> cacheMetrics() {
+        @Override public Map<String, CacheMetrics> cacheMetrics() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java
new file mode 100644
index 0000000..dce9dda
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class GridCachePartitionedMetricsForClusterGroupTest extends 
GridCacheAbstractMetricsSelfTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+    public static final String CACHE_1 = "cache1";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setBackups(0);
+
+        return ccfg;
+    }
+
+    public void testMetrics() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, i);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteCache<Integer, Integer> c = grid(i).cache(null);
+            System.out.println("!!! Grid(" + i + "), puts = " + 
c.metrics().getCachePuts() + ", size = " + c.size());
+        }
+
+        Thread.sleep(10000);
+
+        CacheMetrics metrics = cache.metrics(grid(0).cluster().forRemotes());
+
+        System.out.println(metrics);
+/*
+        assertEquals(1, cache.metrics().getCachePuts());
+        assertEquals(0, grid(1).cache(null).metrics().getCachePuts());
+        assertEquals(1, grid(2).cache(null).metrics().getCachePuts());
+*/
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
index d9ffc8e..6622288 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
@@ -116,7 +116,7 @@ public class GridP2PClassLoadingSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<CacheMetrics> cacheMetrics() {
+        @Override public Map<String, CacheMetrics> cacheMetrics() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index e5eea88..aed66ad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -57,7 +57,7 @@ public class GridTestNode extends GridMetadataAwareAdapter 
implements ClusterNod
     private ClusterMetrics metrics;
 
     /** */
-    private Collection<CacheMetrics> cacheMetrics = Collections.emptyList();
+    private Map<String, CacheMetrics> cacheMetrics = Collections.emptyMap();
 
     /** */
     private long order;
@@ -190,7 +190,7 @@ public class GridTestNode extends GridMetadataAwareAdapter 
implements ClusterNod
         return metrics;
     }
 
-    @Override public Collection<CacheMetrics> cacheMetrics() {
+    @Override public Map<String, CacheMetrics> cacheMetrics() {
         return cacheMetrics;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 4d65587..2655b9d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -352,8 +352,8 @@ public abstract class GridSpiAbstractTest<T extends 
IgniteSpi> extends GridAbstr
             @Override public ClusterMetrics metrics() { return new 
ClusterMetricsSnapshot(); }
 
             /** {@inheritDoc} */
-            @Override public Collection<CacheMetrics> cacheMetrics() {
-                return Collections.<CacheMetrics>singletonList(new 
CacheMetricsSnapshot());
+            @Override public Map<String, CacheMetrics> cacheMetrics() {
+                return Collections.<String, CacheMetrics>singletonMap(null, 
new CacheMetricsSnapshot());
             }
         };
     }

Reply via email to