ignite-366 Metrics for caches should work in clustered mode

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d319bb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d319bb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d319bb5d

Branch: refs/heads/ignite-366
Commit: d319bb5d5881a3f1be02ef7467ff3162c565b62b
Parents: 71439bc
Author: Andrey Gura <ag...@gridgain.com>
Authored: Wed Apr 1 21:02:20 2015 +0300
Committer: Andrey Gura <ag...@gridgain.com>
Committed: Wed Apr 8 15:42:30 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   9 +
 .../org/apache/ignite/cache/CacheMetrics.java   |   7 +
 .../org/apache/ignite/cluster/ClusterNode.java  |  13 +
 .../discovery/GridDiscoveryManager.java         |  17 ++
 .../processors/cache/CacheMetricsImpl.java      |  10 +-
 .../cache/CacheMetricsMXBeanImpl.java           |   5 +
 .../processors/cache/CacheMetricsSnapshot.java  | 239 ++++++++++++++++++-
 .../processors/cache/IgniteCacheProxy.java      |  17 ++
 .../spi/discovery/DiscoveryMetricsProvider.java |  10 +
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  38 ++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  46 +++-
 .../tcp/internal/TcpDiscoveryNode.java          |  24 ++
 .../messages/TcpDiscoveryHeartbeatMessage.java  |  94 ++++++++
 .../ignite/internal/GridDiscoverySelfTest.java  |   5 +
 .../CacheMetricsForClusterGroupSelfTest.java    | 154 ++++++++++++
 .../ignite/p2p/GridP2PClassLoadingSelfTest.java |   6 +
 .../ignite/testframework/GridTestNode.java      |   9 +
 .../junits/spi/GridSpiAbstractTest.java         |   6 +
 .../IgniteCacheMetricsSelfTestSuite.java        |   4 +
 19 files changed, 701 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 5c5bb25..cc0805e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.affinity.rendezvous.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
@@ -514,6 +515,14 @@ public interface IgniteCache<K, V> extends 
javax.cache.Cache<K, V>, IgniteAsyncS
     public CacheMetrics metrics();
 
     /**
+     * Gets snapshot metrics for caches in cluster group.
+     *
+     * @param grp Cluster group.
+     * @return Cache metrics.
+     */
+    public CacheMetrics metrics(ClusterGroup grp);
+
+    /**
      * Gets MxBean for this cache.
      *
      * @return MxBean.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0d87326..dad0ddd 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -147,6 +147,13 @@ public interface CacheMetrics {
     public String name();
 
     /**
+     * Gets ID of this cache.
+     *
+     * @return Cache ID.
+     */
+    public int id();
+
+    /**
      * Gets number of entries that was swapped to disk.
      *
      * @return Number of entries that was swapped to disk.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java 
b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 9cb5d3d..2c7fd46 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -148,6 +149,18 @@ public interface ClusterNode {
     public ClusterMetrics metrics();
 
     /**
+     * Gets collections of cache metrics for this node. Note that node cache 
metrics are constantly updated
+     * and provide up to date information about caches.
+     * <p>
+     * Cache metrics are updated with some delay which is directly related to 
heartbeat
+     * frequency. For example, when used with default
+     * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update 
will happen every {@code 2} seconds.
+     *
+     * @return Runtime metrics snapshots for this node.
+     */
+    public Map<Integer, CacheMetrics> cacheMetrics();
+
+    /**
      * Gets all node attributes. Attributes are assigned to nodes at startup
      * via {@link 
org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
      * <p>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 04ff423..15b6ba9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.util.*;
@@ -644,6 +646,21 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                 return nm;
             }
+
+            /** {@inheritDoc} */
+            @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+                Collection<GridCacheAdapter<?, ?>> caches = 
ctx.cache().internalCaches();
+
+                if (F.isEmpty(caches))
+                    return Collections.emptyMap();
+
+                Map<Integer, CacheMetrics> metrics = 
U.newHashMap(caches.size());
+
+                for (GridCacheAdapter<?, ?> cache : caches)
+                    metrics.put(cache.context().cacheId(), cache.metrics());
+
+                return metrics;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index deebab4..8d9d02b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -110,6 +110,10 @@ public class CacheMetricsImpl implements CacheMetrics {
         this.delegate = delegate;
     }
 
+    /** {@inheritDoc} */
+    @Override public int id() {
+        return cctx.cacheId();
+    }
 
     /** {@inheritDoc} */
     @Override public String name() {
@@ -353,9 +357,8 @@ public class CacheMetricsImpl implements CacheMetrics {
         long misses0 = misses.get();
         long reads0 = reads.get();
 
-        if (misses0 == 0) {
+        if (misses0 == 0)
             return 0;
-        }
 
         return (float) misses0 / reads0 * 100.0f;
     }
@@ -468,9 +471,8 @@ public class CacheMetricsImpl implements CacheMetrics {
         txCommits.incrementAndGet();
         commitTimeNanos.addAndGet(duration);
 
-        if (delegate != null) {
+        if (delegate != null)
             delegate.onTxCommit(duration);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
index e9d547c..3dd206b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
@@ -39,6 +39,11 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public int id() {
+        return cache.context().cacheId();
+    }
+
+    /** {@inheritDoc} */
     @Override public String name() {
         return cache.metrics0().name();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 0391f4e..61ca68b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
+import java.util.*;
+
 /**
  * Metrics snapshot.
  */
-class CacheMetricsSnapshot implements CacheMetrics {
+public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** Number of reads. */
     private long reads = 0;
 
@@ -63,6 +69,9 @@ class CacheMetricsSnapshot implements CacheMetrics {
     /** Commit transaction time taken nanos. */
     private float rollbackAvgTimeNanos = 0;
 
+    /** Cache ID. */
+    private int id;
+
     /** Cache name */
     private String cacheName;
 
@@ -178,6 +187,13 @@ class CacheMetricsSnapshot implements CacheMetrics {
     private boolean isWriteThrough;
 
     /**
+     * Default constructor.
+     */
+    public CacheMetricsSnapshot() {
+        // No-op.
+    }
+
+    /**
      * Create snapshot for given metrics.
      *
      * @param m Cache metrics.
@@ -198,6 +214,7 @@ class CacheMetricsSnapshot implements CacheMetrics {
         commitAvgTimeNanos = m.getAverageTxCommitTime();
         rollbackAvgTimeNanos = m.getAverageTxRollbackTime();
 
+        id = m.id();
         cacheName = m.name();
         overflowSize = m.getOverflowSize();
         offHeapEntriesCount = m.getOffHeapEntriesCount();
@@ -239,6 +256,134 @@ class CacheMetricsSnapshot implements CacheMetrics {
         isWriteThrough = m.isWriteThrough();
     }
 
+    /**
+     * Constructs merged cache metrics.
+     *
+     * @param loc Metrics for cache on local node.
+     * @param metrics Metrics for merge.
+     */
+    public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> 
metrics) {
+        A.notEmpty(metrics, "metrics");
+
+        id = loc.id();
+        cacheName = loc.name();
+        isEmpty = loc.isEmpty();
+        isWriteBehindEnabled = loc.isWriteBehindEnabled();
+        writeBehindFlushSize = loc.getWriteBehindFlushSize();
+        writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount();
+        writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency();
+        writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize();
+        writeBehindBufferSize = loc.getWriteBehindBufferSize();
+        size = loc.getSize();
+        keySize = loc.getKeySize();
+
+        keyType = loc.getKeyType();
+        valueType = loc.getValueType();
+        isStoreByValue = loc.isStoreByValue();
+        isStatisticsEnabled = loc.isStatisticsEnabled();
+        isManagementEnabled = loc.isManagementEnabled();
+        isReadThrough = loc.isReadThrough();
+        isWriteThrough = loc.isWriteThrough();
+
+        for (CacheMetrics e : metrics) {
+            reads += e.getCacheGets();
+            puts += e.getCachePuts();
+            hits += e.getCacheHits();
+            misses += e.getCacheHits();
+            txCommits += e.getCacheTxCommits();
+            txRollbacks += e.getCacheTxRollbacks();
+            evicts += e.getCacheEvictions();
+            removes += e.getCacheRemovals();
+
+            putAvgTimeNanos += e.getAveragePutTime();
+            getAvgTimeNanos += e.getAverageGetTime();
+            removeAvgTimeNanos += e.getAverageRemoveTime();
+            commitAvgTimeNanos += e.getAverageTxCommitTime();
+            rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
+
+            if (e.getOverflowSize() > -1)
+                overflowSize += e.getOverflowSize();
+            else
+                overflowSize = -1;
+
+            offHeapEntriesCount += e.getOffHeapEntriesCount();
+            offHeapAllocatedSize += e.getOffHeapAllocatedSize();
+
+            if (e.getDhtEvictQueueCurrentSize() > -1)
+                dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+            else
+                dhtEvictQueueCurrentSize = -1;
+
+            txThreadMapSize += e.getTxThreadMapSize();
+            txXidMapSize += e.getTxXidMapSize();
+            txCommitQueueSize += e.getTxCommitQueueSize();
+            txPrepareQueueSize += e.getTxPrepareQueueSize();
+            txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+            txCommittedVersionsSize += e.getTxCommittedVersionsSize();
+            txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
+
+            if (e.getTxDhtThreadMapSize() > -1)
+                txDhtThreadMapSize += e.getTxDhtThreadMapSize();
+            else
+                txDhtThreadMapSize = -1;
+
+            if (e.getTxDhtXidMapSize() > -1)
+                txDhtXidMapSize += e.getTxDhtXidMapSize();
+            else
+                txDhtXidMapSize = -1;
+
+            if (e.getTxDhtCommitQueueSize() > -1)
+                txDhtCommitQueueSize += e.getTxDhtCommitQueueSize();
+            else
+                txDhtCommitQueueSize = -1;
+
+            if (e.getTxDhtPrepareQueueSize() > -1)
+                txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize();
+            else
+                txDhtPrepareQueueSize = -1;
+
+            if (e.getTxDhtStartVersionCountsSize() > -1)
+                txDhtStartVersionCountsSize += 
e.getTxDhtStartVersionCountsSize();
+            else
+                txDhtStartVersionCountsSize = -1;
+
+            if (e.getTxDhtCommittedVersionsSize() > -1)
+                txDhtCommittedVersionsSize += 
e.getTxDhtCommittedVersionsSize();
+            else
+                txDhtCommittedVersionsSize = -1;
+
+            if (e.getTxDhtRolledbackVersionsSize() > -1)
+                txDhtRolledbackVersionsSize += 
e.getTxDhtRolledbackVersionsSize();
+            else
+                txDhtRolledbackVersionsSize = -1;
+
+            if (e.getWriteBehindTotalCriticalOverflowCount() > -1)
+                writeBehindTotalCriticalOverflowCount += 
e.getWriteBehindTotalCriticalOverflowCount();
+            else
+                writeBehindTotalCriticalOverflowCount = -1;
+
+            if (e.getWriteBehindCriticalOverflowCount() > -1)
+                writeBehindCriticalOverflowCount += 
e.getWriteBehindCriticalOverflowCount();
+            else
+                writeBehindCriticalOverflowCount = -1;
+
+            if (e.getWriteBehindErrorRetryCount() > -1)
+                writeBehindErrorRetryCount += 
e.getWriteBehindErrorRetryCount();
+            else
+                writeBehindErrorRetryCount = -1;
+        }
+
+        int size = metrics.size();
+
+        if (size > 1) {
+            putAvgTimeNanos /= size;
+            getAvgTimeNanos /= size;
+            removeAvgTimeNanos /= size;
+            commitAvgTimeNanos /= size;
+            rollbackAvgTimeNanos /= size;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public long getCacheHits() {
         return hits;
@@ -259,9 +404,8 @@ class CacheMetricsSnapshot implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public float getCacheMissPercentage() {
-        if (misses == 0 || reads == 0) {
+        if (misses == 0 || reads == 0)
             return 0;
-        }
 
         return (float) misses / reads * 100.0f;
     }
@@ -327,6 +471,11 @@ class CacheMetricsSnapshot implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public int id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
     @Override public long getOverflowSize() {
         return overflowSize;
     }
@@ -515,4 +664,88 @@ class CacheMetricsSnapshot implements CacheMetrics {
     @Override public String toString() {
         return S.toString(CacheMetricsSnapshot.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(id);
+
+        out.writeLong(reads);
+        out.writeLong(puts);
+        out.writeLong(hits);
+        out.writeLong(misses);
+        out.writeLong(txCommits);
+        out.writeLong(txRollbacks);
+        out.writeLong(evicts);
+        out.writeLong(removes);
+
+        out.writeFloat(putAvgTimeNanos);
+        out.writeFloat(getAvgTimeNanos);
+        out.writeFloat(removeAvgTimeNanos);
+        out.writeFloat(commitAvgTimeNanos);
+        out.writeFloat(rollbackAvgTimeNanos);
+
+        out.writeLong(overflowSize);
+        out.writeLong(offHeapEntriesCount);
+        out.writeLong(offHeapAllocatedSize);
+        out.writeInt(dhtEvictQueueCurrentSize);
+        out.writeInt(txThreadMapSize);
+        out.writeInt(txXidMapSize);
+        out.writeInt(txCommitQueueSize);
+        out.writeInt(txPrepareQueueSize);
+        out.writeInt(txStartVersionCountsSize);
+        out.writeInt(txCommittedVersionsSize);
+        out.writeInt(txRolledbackVersionsSize);
+        out.writeInt(txDhtThreadMapSize);
+        out.writeInt(txDhtXidMapSize);
+        out.writeInt(txDhtCommitQueueSize);
+        out.writeInt(txDhtPrepareQueueSize);
+        out.writeInt(txDhtStartVersionCountsSize);
+        out.writeInt(txDhtCommittedVersionsSize);
+        out.writeInt(txDhtRolledbackVersionsSize);
+        out.writeInt(writeBehindTotalCriticalOverflowCount);
+        out.writeInt(writeBehindCriticalOverflowCount);
+        out.writeInt(writeBehindErrorRetryCount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        id = in.readInt();
+
+        reads = in.readLong();
+        puts = in.readLong();
+        hits = in.readLong();
+        misses = in.readLong();
+        txCommits = in.readLong();
+        txRollbacks = in.readLong();
+        evicts = in.readLong();
+        removes = in.readLong();
+
+        putAvgTimeNanos = in.readFloat();
+        getAvgTimeNanos = in.readFloat();
+        removeAvgTimeNanos = in.readFloat();
+        commitAvgTimeNanos = in.readFloat();
+        rollbackAvgTimeNanos = in.readFloat();
+
+        overflowSize = in.readLong();
+        offHeapEntriesCount = in.readLong();
+        offHeapAllocatedSize = in.readLong();
+        dhtEvictQueueCurrentSize = in.readInt();
+        txThreadMapSize = in.readInt();
+        txXidMapSize = in.readInt();
+        txCommitQueueSize = in.readInt();
+        txPrepareQueueSize = in.readInt();
+        txStartVersionCountsSize = in.readInt();
+        txCommittedVersionsSize = in.readInt();
+        txRolledbackVersionsSize = in.readInt();
+        txDhtThreadMapSize = in.readInt();
+        txDhtXidMapSize = in.readInt();
+        txDhtCommitQueueSize = in.readInt();
+        txDhtPrepareQueueSize = in.readInt();
+        txDhtStartVersionCountsSize = in.readInt();
+        txDhtCommittedVersionsSize = in.readInt();
+        txDhtRolledbackVersionsSize = in.readInt();
+        writeBehindTotalCriticalOverflowCount = in.readInt();
+        writeBehindCriticalOverflowCount = in.readInt();
+        writeBehindErrorRetryCount = in.readInt();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index dfc3ef4..e0e3972 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -140,6 +140,23 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public CacheMetrics metrics(ClusterGroup grp) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
+
+            for (ClusterNode node : grp.nodes())
+                metrics.add(node.cacheMetrics().get(context().cacheId()));
+
+            return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheMetricsMXBean mxBean() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
index 4a03278..c2bdc53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.spi.discovery;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.util.tostring.*;
 
+import java.util.*;
+
 /**
  * Provides metrics to discovery SPI. It is responsibility of discovery SPI
  * to make sure that all nodes have updated metrics data about each other.
@@ -36,4 +39,11 @@ public interface DiscoveryMetricsProvider {
      * @return Up to date metrics data about local node.
      */
     public ClusterMetrics metrics();
+
+    /**
+     * Returns metrics data about all caches on local node.
+     *
+     * @return metrics data about all caches on local node.
+     */
+    public Map<Integer, CacheMetrics> cacheMetrics();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index bf69efb..e548488 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -1067,7 +1068,11 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                     Socket sock0 = sock;
 
                     if (sock0 != null) {
-                        msg.setMetrics(getLocalNodeId(), 
metricsProvider.metrics());
+                        UUID nodeId = ignite.configuration().getNodeId();
+
+                        msg.setMetrics(nodeId, metricsProvider.metrics());
+
+                        msg.setCacheMetrics(nodeId, 
metricsProvider.cacheMetrics());
 
                         try {
                             writeToSocket(sock0, msg);
@@ -1094,9 +1099,9 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                     log.debug("Received heartbeat response: " + msg);
             }
             else {
-                if (msg.hasMetrics()) {
-                    long tstamp = U.currentTimeMillis();
+                long tstamp = U.currentTimeMillis();
 
+                if (msg.hasMetrics()) {
                     for (Map.Entry<UUID, MetricsSet> e : 
msg.metrics().entrySet()) {
                         MetricsSet metricsSet = e.getValue();
 
@@ -1106,6 +1111,11 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                             updateMetrics(t.get1(), t.get2(), tstamp);
                     }
                 }
+
+                if (msg.hasCacheMetrics()) {
+                    for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
+                        updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
+                }
             }
         }
 
@@ -1171,6 +1181,28 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         }
 
         /**
+         * @param nodeId Node ID.
+         * @param cacheMetrics Cache metrics.
+         * @param tstamp Timestamp.
+         */
+        private void updateCacheMetrics(UUID nodeId, Map<Integer, 
CacheMetrics> cacheMetrics, long tstamp) {
+            assert nodeId != null;
+            assert cacheMetrics != null;
+
+            TcpDiscoveryNode node = 
nodeId.equals(ignite.configuration().getNodeId()) ? locNode : 
rmtNodes.get(nodeId);
+
+            if (node != null && node.visible()) {
+                node.setCacheMetrics(cacheMetrics);
+
+                node.lastUpdateTime(tstamp);
+
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, 
allNodes());
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
+        }
+
+        /**
          * @param topVer New topology version.
          * @return Latest topology snapshot.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index bad8837..57a8869 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -4240,7 +4241,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                 return;
             }
 
-            if (locNodeId.equals(msg.creatorNodeId()) && 
!msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) {
+            if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, 
locNodeId) && msg.senderNodeId() != null) {
                 if (log.isDebugEnabled())
                     log.debug("Discarding heartbeat message that has made two 
passes: " + msg);
 
@@ -4260,14 +4261,21 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                             updateMetrics(t.get1(), t.get2(), tstamp);
                     }
                 }
+
+                if (msg.hasCacheMetrics()) {
+                    for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : 
msg.cacheMetrics().entrySet())
+                        updateCacheMetrics(e.getKey(), e.getValue(), tstamp);
+                }
             }
 
             if (ring.hasRemoteNodes()) {
                 if ((locNodeId.equals(msg.creatorNodeId()) && 
msg.senderNodeId() == null ||
-                    !msg.hasMetrics(locNodeId)) && spiStateCopy() == 
CONNECTED) {
+                    !hasMetrics(msg, locNodeId)) && spiStateCopy() == 
CONNECTED) {
                     // Message is on its first ring or just created on 
coordinator.
                     msg.setMetrics(locNodeId, metricsProvider.metrics());
 
+                    msg.setCacheMetrics(locNodeId, 
metricsProvider.cacheMetrics());
+
                     for (Map.Entry<UUID, ClientMessageWorker> e : 
clientMsgWorkers.entrySet()) {
                         UUID nodeId = e.getKey();
                         ClusterMetrics metrics = e.getValue().metrics();
@@ -4282,6 +4290,8 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                     // Message is on its second ring.
                     msg.removeMetrics(locNodeId);
 
+                    msg.removeCacheMetrics(locNodeId);
+
                     Collection<UUID> clientNodeIds = msg.clientNodeIds();
 
                     for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
@@ -4333,6 +4343,35 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         }
 
         /**
+         * @param nodeId Node ID.
+         * @param cacheMetrics Cache metrics.
+         * @param tstamp Timestamp.
+         */
+        private void updateCacheMetrics(UUID nodeId, Map<Integer, 
CacheMetrics> cacheMetrics, long tstamp) {
+            assert nodeId != null;
+            assert cacheMetrics != null;
+
+            TcpDiscoveryNode node = ring.node(nodeId);
+
+            if (node != null && node.visible()) {
+                node.setCacheMetrics(cacheMetrics);
+
+                node.lastUpdateTime(tstamp);
+
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, 
ring.topologyVersion(), node);
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Received cacheMetrics from unknown node: " + 
nodeId);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID 
nodeId) {
+            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
+        }
+
+        /**
          * Processes discard message and discards previously registered 
pending messages.
          *
          * @param msg Discard message.
@@ -5116,7 +5155,10 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
 
                     hbMsg.removeMetrics(nodeId);
 
+                    hbMsg.removeCacheMetrics(nodeId);
+
                     assert !hbMsg.hasMetrics();
+                    assert !hbMsg.hasCacheMetrics();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 450dd8c..3cafd4f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp.internal;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -73,6 +74,10 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
     @GridToStringExclude
     private volatile ClusterMetrics metrics;
 
+    /** Node cache metrics. */
+    @GridToStringExclude
+    private volatile Map<Integer, CacheMetrics> cacheMetrics;
+
     /** Node order in the topology. */
     private volatile long order;
 
@@ -209,6 +214,25 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Cluste
         this.metrics = metrics;
     }
 
+    /** {@inheritDoc} */
+    @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+        if (metricsProvider != null)
+            cacheMetrics = metricsProvider.cacheMetrics();
+
+        return cacheMetrics;
+    }
+
+    /**
+     * Sets node cache metrics.
+     *
+     * @param cacheMetrics Cache metrics.
+     */
+    public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) {
+        assert cacheMetrics != null;
+
+        this.cacheMetrics = cacheMetrics;
+    }
+
     /**
      * @return Internal order.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index 65eea9f..05268d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -52,6 +54,10 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     /** Client node IDs. */
     private Collection<UUID> clientNodeIds;
 
+    /** Cahce metrics by node. */
+    @GridToStringExclude
+    private Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics;
+
     /**
      * Public default no-arg constructor for {@link Externalizable} interface.
      */
@@ -68,6 +74,7 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
         super(creatorNodeId);
 
         metrics = U.newHashMap(1);
+        cacheMetrics = U.newHashMap(1);
         clientNodeIds = new HashSet<>();
     }
 
@@ -86,6 +93,20 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Sets cache metrics for particular node.
+     *
+     * @param nodeId Node ID.
+     * @param metrics Node cache metrics.
+     */
+    public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> 
metrics) {
+        assert nodeId != null;
+        assert metrics != null;
+        assert !this.cacheMetrics.containsKey(nodeId);
+
+        this.cacheMetrics.put(nodeId, metrics);
+    }
+
+    /**
      * Sets metrics for a client node.
      *
      * @param nodeId Server node ID.
@@ -113,6 +134,17 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Removes cache metrics for particular node from the message.
+     *
+     * @param nodeId Node ID.
+     */
+    public void removeCacheMetrics(UUID nodeId) {
+        assert nodeId != null;
+
+        cacheMetrics.remove(nodeId);
+    }
+
+    /**
      * Gets metrics map.
      *
      * @return Metrics map.
@@ -122,6 +154,15 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Gets cache metrics map.
+     *
+     * @return Cache metrics map.
+     */
+    public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() {
+        return cacheMetrics;
+    }
+
+    /**
      * @return {@code True} if this message contains metrics.
      */
     public boolean hasMetrics() {
@@ -129,6 +170,13 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @return {@code True} this message contains cache metrics.
+     */
+    public boolean hasCacheMetrics() {
+        return !cacheMetrics.isEmpty();
+    }
+
+    /**
      * @return {@code True} if this message contains metrics.
      */
     public boolean hasMetrics(UUID nodeId) {
@@ -138,6 +186,17 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param nodeId Node ID.
+     *
+     * @return {@code True} if this message contains cache metrics for 
particular node.
+     */
+    public boolean hasCacheMetrics(UUID nodeId) {
+        assert nodeId != null;
+
+        return cacheMetrics.get(nodeId) != null;
+    }
+
+    /**
      * Gets client node IDs for  particular node.
      *
      * @return Client node IDs.
@@ -168,6 +227,21 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
             }
         }
 
+        out.writeInt(cacheMetrics.size());
+
+        if (!cacheMetrics.isEmpty()) {
+            for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : 
cacheMetrics.entrySet()) {
+                U.writeUuid(out, e.getKey());
+
+                Map<Integer, CacheMetrics> ms = e.getValue();
+
+                out.writeInt(ms == null ? 0 : ms.size());
+
+                for (Map.Entry<Integer, CacheMetrics> m : ms.entrySet())
+                    out.writeObject(m.getValue());
+            }
+        }
+
         U.writeCollection(out, clientNodeIds);
     }
 
@@ -182,6 +256,26 @@ public class TcpDiscoveryHeartbeatMessage extends 
TcpDiscoveryAbstractMessage {
         for (int i = 0; i < metricsSize; i++)
             metrics.put(U.readUuid(in), (MetricsSet)in.readObject());
 
+        int cacheMetricsSize = in.readInt();
+
+        cacheMetrics = U.newHashMap(cacheMetricsSize);
+
+        for (int i = 0; i < cacheMetricsSize; i++) {
+            UUID uuid = U.readUuid(in);
+
+            int size = in.readInt();
+
+            Map<Integer, CacheMetrics> ms = U.newHashMap(size);
+
+            for (int j = 0; j < size; j++) {
+                CacheMetricsSnapshot m = (CacheMetricsSnapshot) 
in.readObject();
+
+                ms.put(m.id(), m);
+            }
+
+            cacheMetrics.put(uuid, ms);
+        }
+
         clientNodeIds = U.readCollection(in);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index 4a2483d..96a1729 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -376,6 +376,11 @@ public class GridDiscoverySelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Nullable @Override public Map<String, Object> attributes() {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
new file mode 100644
index 0000000..a3ffca9
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+
+/**
+ * Test for cluster wide cache metrics.
+ */
+public class CacheMetricsForClusterGroupSelfTest extends 
GridCacheAbstractSelfTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Cache 1. */
+    private static final String CACHE1 = "cache1";
+
+    /** Cache 2. */
+    private static final String CACHE2 = "cache2";
+
+    /** Entry count cache 1. */
+    private static final int ENTRY_CNT_CACHE1 = 1000;
+
+    /** Entry count cache 2. */
+    private static final int ENTRY_CNT_CACHE2 = 500;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CacheConfiguration ccfg1 = defaultCacheConfiguration();
+        ccfg1.setName(CACHE1);
+        ccfg1.setStatisticsEnabled(true);
+
+        CacheConfiguration ccfg2 = defaultCacheConfiguration();
+        ccfg2.setName(CACHE2);
+        ccfg2.setStatisticsEnabled(true);
+
+        grid(0).getOrCreateCache(ccfg1);
+        grid(0).getOrCreateCache(ccfg2);
+    }
+
+    /**
+     * Test cluster group metrics.
+     */
+    public void testMetrics() throws Exception {
+        populateCacheData(CACHE1, ENTRY_CNT_CACHE1);
+        populateCacheData(CACHE2, ENTRY_CNT_CACHE2);
+
+        readCacheData(CACHE1, ENTRY_CNT_CACHE1);
+        readCacheData(CACHE2, ENTRY_CNT_CACHE2);
+
+        // Wait for heartbeat message
+        Thread.sleep(3000);
+
+        assertMetrics(CACHE1);
+        assertMetrics(CACHE2);
+    }
+
+    /**
+     * @param name Name.
+     * @param cnt Count.
+     */
+    private void populateCacheData(String name, int cnt) {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(name);
+
+        for (int i = 0; i < cnt; i++)
+            cache.put(i, i);
+    }
+
+    /**
+     * @param name Name.
+     * @param cnt Count.
+     */
+    private void readCacheData(String name, int cnt) {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(name);
+
+        for (int i = 0; i < cnt; i++)
+            cache.get(i);
+    }
+
+    /**
+     * @param name Name.
+     */
+    private void assertMetrics(String name) {
+        CacheMetrics metrics = 
grid(0).cache(name).metrics(grid(0).cluster().forCacheNodes(name));
+
+        CacheMetrics[] ms = new CacheMetrics[gridCount()];
+
+        for (int i = 0; i < gridCount(); i++)
+            ms[i] = grid(i).cache(name).metrics();
+
+        // Static metrics
+        for (int i = 0; i < gridCount(); i++)
+            assertEquals(metrics.id(), ms[i].id());
+
+        for (int i = 0; i < gridCount(); i++)
+            assertEquals(metrics.name(), ms[i].name());
+
+        // Dynamic metrics
+        assertEquals(metrics.getCacheGets(), sum(ms, new 
IgniteClosure<CacheMetrics, Long>() {
+            @Override public Long apply(CacheMetrics input) {
+                return input.getCacheGets();
+            }
+        }));
+
+        assertEquals(metrics.getCachePuts(), sum(ms, new 
IgniteClosure<CacheMetrics, Long>() {
+            @Override public Long apply(CacheMetrics input) {
+                return input.getCachePuts();
+            }
+        }));
+
+        assertEquals(metrics.getCacheHits(), sum(ms, new 
IgniteClosure<CacheMetrics, Long>() {
+            @Override public Long apply(CacheMetrics input) {
+                return input.getCacheHits();
+            }
+        }));
+    }
+
+    /**
+     * @param ms Milliseconds.
+     * @param f Function.
+     */
+    private long sum(CacheMetrics[] ms, IgniteClosure<CacheMetrics, Long> f) {
+        long res = 0;
+
+        for (int i = 0; i < gridCount(); i++)
+            res += f.apply(ms[i]);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
index 9700d94..c0e55b1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.p2p;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -115,6 +116,11 @@ public class GridP2PClassLoadingSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Nullable @Override public Map<String, Object> attributes() {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index 5de1f14..58c2401 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testframework;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -56,6 +57,9 @@ public class GridTestNode extends GridMetadataAwareAdapter 
implements ClusterNod
     private ClusterMetrics metrics;
 
     /** */
+    private Map<Integer, CacheMetrics> cacheMetrics = Collections.emptyMap();
+
+    /** */
     private long order;
 
     /** */
@@ -187,6 +191,11 @@ public class GridTestNode extends GridMetadataAwareAdapter 
implements ClusterNod
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+        return cacheMetrics;
+    }
+
+    /** {@inheritDoc} */
     @Override public long order() {
         return order != 0 ? order : (metrics == null ? -1 : 
metrics.getStartTime());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 7898c3d..e0acde9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testframework.junits.spi;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -348,6 +349,11 @@ public abstract class GridSpiAbstractTest<T extends 
IgniteSpi> extends GridAbstr
         return new DiscoveryMetricsProvider() {
             /** {@inheritDoc} */
             @Override public ClusterMetrics metrics() { return new 
ClusterMetricsSnapshot(); }
+
+            /** {@inheritDoc} */
+            @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+                return Collections.emptyMap();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d319bb5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 511afec..1adf55f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import org.apache.ignite.internal.processors.cache.local.*;
@@ -47,6 +48,9 @@ public class IgniteCacheMetricsSelfTestSuite extends 
TestSuite {
         
suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class);
         suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class);
 
+        // Cluster wide metrics.
+        suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class);
+
         return suite;
     }
 }

Reply via email to