ignite-366 Metrics for caches should work in clustered mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6c1c33ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6c1c33ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6c1c33ec Branch: refs/heads/ignite-366 Commit: 6c1c33ecdb671b0019cb78f00671617542ae21df Parents: beedb17 Author: Andrey Gura <ag...@gridgain.com> Authored: Wed Apr 1 21:02:20 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Wed Apr 8 01:13:02 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 9 + .../org/apache/ignite/cache/CacheMetrics.java | 7 + .../org/apache/ignite/cluster/ClusterNode.java | 13 + .../discovery/GridDiscoveryManager.java | 17 ++ .../processors/cache/CacheMetricsImpl.java | 10 +- .../cache/CacheMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheMetricsSnapshot.java | 239 ++++++++++++++++++- .../processors/cache/IgniteCacheProxy.java | 17 ++ .../spi/discovery/DiscoveryMetricsProvider.java | 10 + .../discovery/tcp/TcpClientDiscoverySpi.java | 38 ++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 46 +++- .../tcp/internal/TcpDiscoveryNode.java | 24 ++ .../messages/TcpDiscoveryHeartbeatMessage.java | 94 ++++++++ .../ignite/internal/GridDiscoverySelfTest.java | 5 + .../CacheMetricsForClusterGroupSelfTest.java | 154 ++++++++++++ .../ignite/p2p/GridP2PClassLoadingSelfTest.java | 6 + .../ignite/testframework/GridTestNode.java | 9 + .../junits/spi/GridSpiAbstractTest.java | 6 + .../IgniteCacheMetricsSelfTestSuite.java | 4 + 19 files changed, 701 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 5c5bb25..cc0805e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; @@ -514,6 +515,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public CacheMetrics metrics(); /** + * Gets snapshot metrics for caches in cluster group. + * + * @param grp Cluster group. + * @return Cache metrics. + */ + public CacheMetrics metrics(ClusterGroup grp); + + /** * Gets MxBean for this cache. * * @return MxBean. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 0d87326..dad0ddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -147,6 +147,13 @@ public interface CacheMetrics { public String name(); /** + * Gets ID of this cache. + * + * @return Cache ID. + */ + public int id(); + + /** * Gets number of entries that was swapped to disk. * * @return Number of entries that was swapped to disk. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index 9cb5d3d..2c7fd46 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -18,6 +18,7 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -148,6 +149,18 @@ public interface ClusterNode { public ClusterMetrics metrics(); /** + * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated + * and provide up to date information about caches. + * <p> + * Cache metrics are updated with some delay which is directly related to heartbeat + * frequency. For example, when used with default + * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds. + * + * @return Runtime metrics snapshots for this node. + */ + public Map<Integer, CacheMetrics> cacheMetrics(); + + /** * Gets all node attributes. Attributes are assigned to nodes at startup * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 04ff423..15b6ba9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; @@ -644,6 +646,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return nm; } + + /** {@inheritDoc} */ + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + Collection<GridCacheAdapter<?, ?>> caches = ctx.cache().internalCaches(); + + if (F.isEmpty(caches)) + return Collections.emptyMap(); + + Map<Integer, CacheMetrics> metrics = U.newHashMap(caches.size()); + + for (GridCacheAdapter<?, ?> cache : caches) + metrics.put(cache.context().cacheId(), cache.metrics()); + + return metrics; + } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index deebab4..8d9d02b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -110,6 +110,10 @@ public class CacheMetricsImpl implements CacheMetrics { this.delegate = delegate; } + /** {@inheritDoc} */ + @Override public int id() { + return cctx.cacheId(); + } /** {@inheritDoc} */ @Override public String name() { @@ -353,9 +357,8 @@ public class CacheMetricsImpl implements CacheMetrics { long misses0 = misses.get(); long reads0 = reads.get(); - if (misses0 == 0) { + if (misses0 == 0) return 0; - } return (float) misses0 / reads0 * 100.0f; } @@ -468,9 +471,8 @@ public class CacheMetricsImpl implements CacheMetrics { txCommits.incrementAndGet(); commitTimeNanos.addAndGet(duration); - if (delegate != null) { + if (delegate != null) delegate.onTxCommit(duration); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java index e9d547c..3dd206b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java @@ -39,6 +39,11 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public int id() { + return cache.context().cacheId(); + } + + /** {@inheritDoc} */ @Override public String name() { return cache.metrics0().name(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 0391f4e..61ca68b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; +import java.util.*; + /** * Metrics snapshot. */ -class CacheMetricsSnapshot implements CacheMetrics { +public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + /** Number of reads. */ private long reads = 0; @@ -63,6 +69,9 @@ class CacheMetricsSnapshot implements CacheMetrics { /** Commit transaction time taken nanos. */ private float rollbackAvgTimeNanos = 0; + /** Cache ID. */ + private int id; + /** Cache name */ private String cacheName; @@ -178,6 +187,13 @@ class CacheMetricsSnapshot implements CacheMetrics { private boolean isWriteThrough; /** + * Default constructor. + */ + public CacheMetricsSnapshot() { + // No-op. + } + + /** * Create snapshot for given metrics. * * @param m Cache metrics. @@ -198,6 +214,7 @@ class CacheMetricsSnapshot implements CacheMetrics { commitAvgTimeNanos = m.getAverageTxCommitTime(); rollbackAvgTimeNanos = m.getAverageTxRollbackTime(); + id = m.id(); cacheName = m.name(); overflowSize = m.getOverflowSize(); offHeapEntriesCount = m.getOffHeapEntriesCount(); @@ -239,6 +256,134 @@ class CacheMetricsSnapshot implements CacheMetrics { isWriteThrough = m.isWriteThrough(); } + /** + * Constructs merged cache metrics. + * + * @param loc Metrics for cache on local node. + * @param metrics Metrics for merge. + */ + public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics) { + A.notEmpty(metrics, "metrics"); + + id = loc.id(); + cacheName = loc.name(); + isEmpty = loc.isEmpty(); + isWriteBehindEnabled = loc.isWriteBehindEnabled(); + writeBehindFlushSize = loc.getWriteBehindFlushSize(); + writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount(); + writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency(); + writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize(); + writeBehindBufferSize = loc.getWriteBehindBufferSize(); + size = loc.getSize(); + keySize = loc.getKeySize(); + + keyType = loc.getKeyType(); + valueType = loc.getValueType(); + isStoreByValue = loc.isStoreByValue(); + isStatisticsEnabled = loc.isStatisticsEnabled(); + isManagementEnabled = loc.isManagementEnabled(); + isReadThrough = loc.isReadThrough(); + isWriteThrough = loc.isWriteThrough(); + + for (CacheMetrics e : metrics) { + reads += e.getCacheGets(); + puts += e.getCachePuts(); + hits += e.getCacheHits(); + misses += e.getCacheHits(); + txCommits += e.getCacheTxCommits(); + txRollbacks += e.getCacheTxRollbacks(); + evicts += e.getCacheEvictions(); + removes += e.getCacheRemovals(); + + putAvgTimeNanos += e.getAveragePutTime(); + getAvgTimeNanos += e.getAverageGetTime(); + removeAvgTimeNanos += e.getAverageRemoveTime(); + commitAvgTimeNanos += e.getAverageTxCommitTime(); + rollbackAvgTimeNanos += e.getAverageTxRollbackTime(); + + if (e.getOverflowSize() > -1) + overflowSize += e.getOverflowSize(); + else + overflowSize = -1; + + offHeapEntriesCount += e.getOffHeapEntriesCount(); + offHeapAllocatedSize += e.getOffHeapAllocatedSize(); + + if (e.getDhtEvictQueueCurrentSize() > -1) + dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + else + dhtEvictQueueCurrentSize = -1; + + txThreadMapSize += e.getTxThreadMapSize(); + txXidMapSize += e.getTxXidMapSize(); + txCommitQueueSize += e.getTxCommitQueueSize(); + txPrepareQueueSize += e.getTxPrepareQueueSize(); + txStartVersionCountsSize += e.getTxStartVersionCountsSize(); + txCommittedVersionsSize += e.getTxCommittedVersionsSize(); + txRolledbackVersionsSize += e.getTxRolledbackVersionsSize(); + + if (e.getTxDhtThreadMapSize() > -1) + txDhtThreadMapSize += e.getTxDhtThreadMapSize(); + else + txDhtThreadMapSize = -1; + + if (e.getTxDhtXidMapSize() > -1) + txDhtXidMapSize += e.getTxDhtXidMapSize(); + else + txDhtXidMapSize = -1; + + if (e.getTxDhtCommitQueueSize() > -1) + txDhtCommitQueueSize += e.getTxDhtCommitQueueSize(); + else + txDhtCommitQueueSize = -1; + + if (e.getTxDhtPrepareQueueSize() > -1) + txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize(); + else + txDhtPrepareQueueSize = -1; + + if (e.getTxDhtStartVersionCountsSize() > -1) + txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); + else + txDhtStartVersionCountsSize = -1; + + if (e.getTxDhtCommittedVersionsSize() > -1) + txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); + else + txDhtCommittedVersionsSize = -1; + + if (e.getTxDhtRolledbackVersionsSize() > -1) + txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize(); + else + txDhtRolledbackVersionsSize = -1; + + if (e.getWriteBehindTotalCriticalOverflowCount() > -1) + writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount(); + else + writeBehindTotalCriticalOverflowCount = -1; + + if (e.getWriteBehindCriticalOverflowCount() > -1) + writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount(); + else + writeBehindCriticalOverflowCount = -1; + + if (e.getWriteBehindErrorRetryCount() > -1) + writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount(); + else + writeBehindErrorRetryCount = -1; + } + + int size = metrics.size(); + + if (size > 1) { + putAvgTimeNanos /= size; + getAvgTimeNanos /= size; + removeAvgTimeNanos /= size; + commitAvgTimeNanos /= size; + rollbackAvgTimeNanos /= size; + } + } + /** {@inheritDoc} */ @Override public long getCacheHits() { return hits; @@ -259,9 +404,8 @@ class CacheMetricsSnapshot implements CacheMetrics { /** {@inheritDoc} */ @Override public float getCacheMissPercentage() { - if (misses == 0 || reads == 0) { + if (misses == 0 || reads == 0) return 0; - } return (float) misses / reads * 100.0f; } @@ -327,6 +471,11 @@ class CacheMetricsSnapshot implements CacheMetrics { } /** {@inheritDoc} */ + @Override public int id() { + return id; + } + + /** {@inheritDoc} */ @Override public long getOverflowSize() { return overflowSize; } @@ -515,4 +664,88 @@ class CacheMetricsSnapshot implements CacheMetrics { @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + + out.writeLong(reads); + out.writeLong(puts); + out.writeLong(hits); + out.writeLong(misses); + out.writeLong(txCommits); + out.writeLong(txRollbacks); + out.writeLong(evicts); + out.writeLong(removes); + + out.writeFloat(putAvgTimeNanos); + out.writeFloat(getAvgTimeNanos); + out.writeFloat(removeAvgTimeNanos); + out.writeFloat(commitAvgTimeNanos); + out.writeFloat(rollbackAvgTimeNanos); + + out.writeLong(overflowSize); + out.writeLong(offHeapEntriesCount); + out.writeLong(offHeapAllocatedSize); + out.writeInt(dhtEvictQueueCurrentSize); + out.writeInt(txThreadMapSize); + out.writeInt(txXidMapSize); + out.writeInt(txCommitQueueSize); + out.writeInt(txPrepareQueueSize); + out.writeInt(txStartVersionCountsSize); + out.writeInt(txCommittedVersionsSize); + out.writeInt(txRolledbackVersionsSize); + out.writeInt(txDhtThreadMapSize); + out.writeInt(txDhtXidMapSize); + out.writeInt(txDhtCommitQueueSize); + out.writeInt(txDhtPrepareQueueSize); + out.writeInt(txDhtStartVersionCountsSize); + out.writeInt(txDhtCommittedVersionsSize); + out.writeInt(txDhtRolledbackVersionsSize); + out.writeInt(writeBehindTotalCriticalOverflowCount); + out.writeInt(writeBehindCriticalOverflowCount); + out.writeInt(writeBehindErrorRetryCount); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + + reads = in.readLong(); + puts = in.readLong(); + hits = in.readLong(); + misses = in.readLong(); + txCommits = in.readLong(); + txRollbacks = in.readLong(); + evicts = in.readLong(); + removes = in.readLong(); + + putAvgTimeNanos = in.readFloat(); + getAvgTimeNanos = in.readFloat(); + removeAvgTimeNanos = in.readFloat(); + commitAvgTimeNanos = in.readFloat(); + rollbackAvgTimeNanos = in.readFloat(); + + overflowSize = in.readLong(); + offHeapEntriesCount = in.readLong(); + offHeapAllocatedSize = in.readLong(); + dhtEvictQueueCurrentSize = in.readInt(); + txThreadMapSize = in.readInt(); + txXidMapSize = in.readInt(); + txCommitQueueSize = in.readInt(); + txPrepareQueueSize = in.readInt(); + txStartVersionCountsSize = in.readInt(); + txCommittedVersionsSize = in.readInt(); + txRolledbackVersionsSize = in.readInt(); + txDhtThreadMapSize = in.readInt(); + txDhtXidMapSize = in.readInt(); + txDhtCommitQueueSize = in.readInt(); + txDhtPrepareQueueSize = in.readInt(); + txDhtStartVersionCountsSize = in.readInt(); + txDhtCommittedVersionsSize = in.readInt(); + txDhtRolledbackVersionsSize = in.readInt(); + writeBehindTotalCriticalOverflowCount = in.readInt(); + writeBehindCriticalOverflowCount = in.readInt(); + writeBehindErrorRetryCount = in.readInt(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index dfc3ef4..e0e3972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -140,6 +140,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); + + for (ClusterNode node : grp.nodes()) + metrics.add(node.cacheMetrics().get(context().cacheId())); + + return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java index 4a03278..c2bdc53 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java @@ -17,9 +17,12 @@ package org.apache.ignite.spi.discovery; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.tostring.*; +import java.util.*; + /** * Provides metrics to discovery SPI. It is responsibility of discovery SPI * to make sure that all nodes have updated metrics data about each other. @@ -36,4 +39,11 @@ public interface DiscoveryMetricsProvider { * @return Up to date metrics data about local node. */ public ClusterMetrics metrics(); + + /** + * Returns metrics data about all caches on local node. + * + * @return metrics data about all caches on local node. + */ + public Map<Integer, CacheMetrics> cacheMetrics(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index bf69efb..e548488 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -1067,7 +1068,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Socket sock0 = sock; if (sock0 != null) { - msg.setMetrics(getLocalNodeId(), metricsProvider.metrics()); + UUID nodeId = ignite.configuration().getNodeId(); + + msg.setMetrics(nodeId, metricsProvider.metrics()); + + msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics()); try { writeToSocket(sock0, msg); @@ -1094,9 +1099,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp log.debug("Received heartbeat response: " + msg); } else { - if (msg.hasMetrics()) { - long tstamp = U.currentTimeMillis(); + long tstamp = U.currentTimeMillis(); + if (msg.hasMetrics()) { for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) { MetricsSet metricsSet = e.getValue(); @@ -1106,6 +1111,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp updateMetrics(t.get1(), t.get2(), tstamp); } } + + if (msg.hasCacheMetrics()) { + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : msg.cacheMetrics().entrySet()) + updateCacheMetrics(e.getKey(), e.getValue(), tstamp); + } } } @@ -1171,6 +1181,28 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * @param nodeId Node ID. + * @param cacheMetrics Cache metrics. + * @param tstamp Timestamp. + */ + private void updateCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> cacheMetrics, long tstamp) { + assert nodeId != null; + assert cacheMetrics != null; + + TcpDiscoveryNode node = nodeId.equals(ignite.configuration().getNodeId()) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); + } + else if (log.isDebugEnabled()) + log.debug("Received cacheMetrics from unknown node: " + nodeId); + } + + /** * @param topVer New topology version. * @return Latest topology snapshot. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index bad8837..57a8869 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.DiscoveryEvent; @@ -4240,7 +4241,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - if (locNodeId.equals(msg.creatorNodeId()) && !msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) { + if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) { if (log.isDebugEnabled()) log.debug("Discarding heartbeat message that has made two passes: " + msg); @@ -4260,14 +4261,21 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov updateMetrics(t.get1(), t.get2(), tstamp); } } + + if (msg.hasCacheMetrics()) { + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : msg.cacheMetrics().entrySet()) + updateCacheMetrics(e.getKey(), e.getValue(), tstamp); + } } if (ring.hasRemoteNodes()) { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || - !msg.hasMetrics(locNodeId)) && spiStateCopy() == CONNECTED) { + !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. msg.setMetrics(locNodeId, metricsProvider.metrics()); + msg.setCacheMetrics(locNodeId, metricsProvider.cacheMetrics()); + for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); ClusterMetrics metrics = e.getValue().metrics(); @@ -4282,6 +4290,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov // Message is on its second ring. msg.removeMetrics(locNodeId); + msg.removeCacheMetrics(locNodeId); + Collection<UUID> clientNodeIds = msg.clientNodeIds(); for (TcpDiscoveryNode clientNode : ring.clientNodes()) { @@ -4333,6 +4343,35 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** + * @param nodeId Node ID. + * @param cacheMetrics Cache metrics. + * @param tstamp Timestamp. + */ + private void updateCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> cacheMetrics, long tstamp) { + assert nodeId != null; + assert cacheMetrics != null; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node != null && node.visible()) { + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); + } + else if (log.isDebugEnabled()) + log.debug("Received cacheMetrics from unknown node: " + nodeId); + } + + /** + * @param msg Message. + */ + private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); + } + + /** * Processes discard message and discards previously registered pending messages. * * @param msg Discard message. @@ -5116,7 +5155,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov hbMsg.removeMetrics(nodeId); + hbMsg.removeCacheMetrics(nodeId); + assert !hbMsg.hasMetrics(); + assert !hbMsg.hasCacheMetrics(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 450dd8c..3cafd4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.internal; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; @@ -73,6 +74,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @GridToStringExclude private volatile ClusterMetrics metrics; + /** Node cache metrics. */ + @GridToStringExclude + private volatile Map<Integer, CacheMetrics> cacheMetrics; + /** Node order in the topology. */ private volatile long order; @@ -209,6 +214,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.metrics = metrics; } + /** {@inheritDoc} */ + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + if (metricsProvider != null) + cacheMetrics = metricsProvider.cacheMetrics(); + + return cacheMetrics; + } + + /** + * Sets node cache metrics. + * + * @param cacheMetrics Cache metrics. + */ + public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) { + assert cacheMetrics != null; + + this.cacheMetrics = cacheMetrics; + } + /** * @return Internal order. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 65eea9f..05268d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -17,8 +17,10 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -52,6 +54,10 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** Client node IDs. */ private Collection<UUID> clientNodeIds; + /** Cahce metrics by node. */ + @GridToStringExclude + private Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics; + /** * Public default no-arg constructor for {@link Externalizable} interface. */ @@ -68,6 +74,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { super(creatorNodeId); metrics = U.newHashMap(1); + cacheMetrics = U.newHashMap(1); clientNodeIds = new HashSet<>(); } @@ -86,6 +93,20 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Sets cache metrics for particular node. + * + * @param nodeId Node ID. + * @param metrics Node cache metrics. + */ + public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> metrics) { + assert nodeId != null; + assert metrics != null; + assert !this.cacheMetrics.containsKey(nodeId); + + this.cacheMetrics.put(nodeId, metrics); + } + + /** * Sets metrics for a client node. * * @param nodeId Server node ID. @@ -113,6 +134,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Removes cache metrics for particular node from the message. + * + * @param nodeId Node ID. + */ + public void removeCacheMetrics(UUID nodeId) { + assert nodeId != null; + + cacheMetrics.remove(nodeId); + } + + /** * Gets metrics map. * * @return Metrics map. @@ -122,6 +154,15 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Gets cache metrics map. + * + * @return Cache metrics map. + */ + public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() { + return cacheMetrics; + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics() { @@ -129,6 +170,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @return {@code True} this message contains cache metrics. + */ + public boolean hasCacheMetrics() { + return !cacheMetrics.isEmpty(); + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics(UUID nodeId) { @@ -138,6 +186,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @param nodeId Node ID. + * + * @return {@code True} if this message contains cache metrics for particular node. + */ + public boolean hasCacheMetrics(UUID nodeId) { + assert nodeId != null; + + return cacheMetrics.get(nodeId) != null; + } + + /** * Gets client node IDs for particular node. * * @return Client node IDs. @@ -168,6 +227,21 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } } + out.writeInt(cacheMetrics.size()); + + if (!cacheMetrics.isEmpty()) { + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : cacheMetrics.entrySet()) { + U.writeUuid(out, e.getKey()); + + Map<Integer, CacheMetrics> ms = e.getValue(); + + out.writeInt(ms == null ? 0 : ms.size()); + + for (Map.Entry<Integer, CacheMetrics> m : ms.entrySet()) + out.writeObject(m.getValue()); + } + } + U.writeCollection(out, clientNodeIds); } @@ -182,6 +256,26 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { for (int i = 0; i < metricsSize; i++) metrics.put(U.readUuid(in), (MetricsSet)in.readObject()); + int cacheMetricsSize = in.readInt(); + + cacheMetrics = U.newHashMap(cacheMetricsSize); + + for (int i = 0; i < cacheMetricsSize; i++) { + UUID uuid = U.readUuid(in); + + int size = in.readInt(); + + Map<Integer, CacheMetrics> ms = U.newHashMap(size); + + for (int j = 0; j < size; j++) { + CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject(); + + ms.put(m.id(), m); + } + + cacheMetrics.put(uuid, ms); + } + clientNodeIds = U.readCollection(in); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java index 4a2483d..96a1729 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -376,6 +376,11 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Nullable @Override public Map<Integer, CacheMetrics> cacheMetrics() { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public Map<String, Object> attributes() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java new file mode 100644 index 0000000..a3ffca9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; + +/** + * Test for cluster wide cache metrics. + */ +public class CacheMetricsForClusterGroupSelfTest extends GridCacheAbstractSelfTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Cache 1. */ + private static final String CACHE1 = "cache1"; + + /** Cache 2. */ + private static final String CACHE2 = "cache2"; + + /** Entry count cache 1. */ + private static final int ENTRY_CNT_CACHE1 = 1000; + + /** Entry count cache 2. */ + private static final int ENTRY_CNT_CACHE2 = 500; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + CacheConfiguration ccfg1 = defaultCacheConfiguration(); + ccfg1.setName(CACHE1); + ccfg1.setStatisticsEnabled(true); + + CacheConfiguration ccfg2 = defaultCacheConfiguration(); + ccfg2.setName(CACHE2); + ccfg2.setStatisticsEnabled(true); + + grid(0).getOrCreateCache(ccfg1); + grid(0).getOrCreateCache(ccfg2); + } + + /** + * Test cluster group metrics. + */ + public void testMetrics() throws Exception { + populateCacheData(CACHE1, ENTRY_CNT_CACHE1); + populateCacheData(CACHE2, ENTRY_CNT_CACHE2); + + readCacheData(CACHE1, ENTRY_CNT_CACHE1); + readCacheData(CACHE2, ENTRY_CNT_CACHE2); + + // Wait for heartbeat message + Thread.sleep(3000); + + assertMetrics(CACHE1); + assertMetrics(CACHE2); + } + + /** + * @param name Name. + * @param cnt Count. + */ + private void populateCacheData(String name, int cnt) { + IgniteCache<Integer, Integer> cache = grid(0).cache(name); + + for (int i = 0; i < cnt; i++) + cache.put(i, i); + } + + /** + * @param name Name. + * @param cnt Count. + */ + private void readCacheData(String name, int cnt) { + IgniteCache<Integer, Integer> cache = grid(0).cache(name); + + for (int i = 0; i < cnt; i++) + cache.get(i); + } + + /** + * @param name Name. + */ + private void assertMetrics(String name) { + CacheMetrics metrics = grid(0).cache(name).metrics(grid(0).cluster().forCacheNodes(name)); + + CacheMetrics[] ms = new CacheMetrics[gridCount()]; + + for (int i = 0; i < gridCount(); i++) + ms[i] = grid(i).cache(name).metrics(); + + // Static metrics + for (int i = 0; i < gridCount(); i++) + assertEquals(metrics.id(), ms[i].id()); + + for (int i = 0; i < gridCount(); i++) + assertEquals(metrics.name(), ms[i].name()); + + // Dynamic metrics + assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheGets(); + } + })); + + assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCachePuts(); + } + })); + + assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheHits(); + } + })); + } + + /** + * @param ms Milliseconds. + * @param f Function. + */ + private long sum(CacheMetrics[] ms, IgniteClosure<CacheMetrics, Long> f) { + long res = 0; + + for (int i = 0; i < gridCount(); i++) + res += f.apply(ms[i]); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java index 9700d94..c0e55b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.p2p; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.util.lang.*; @@ -115,6 +116,11 @@ public class GridP2PClassLoadingSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Nullable @Override public Map<Integer, CacheMetrics> cacheMetrics() { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public Map<String, Object> attributes() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index 5de1f14..58c2401 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -17,6 +17,7 @@ package org.apache.ignite.testframework; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; @@ -56,6 +57,9 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private ClusterMetrics metrics; /** */ + private Map<Integer, CacheMetrics> cacheMetrics = Collections.emptyMap(); + + /** */ private long order; /** */ @@ -187,6 +191,11 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod } /** {@inheritDoc} */ + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + return cacheMetrics; + } + + /** {@inheritDoc} */ @Override public long order() { return order != 0 ? order : (metrics == null ? -1 : metrics.getStartTime()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index 7898c3d..e0acde9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.testframework.junits.spi; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -348,6 +349,11 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr return new DiscoveryMetricsProvider() { /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); } + + /** {@inheritDoc} */ + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + return Collections.emptyMap(); + } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c1c33ec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index 511afec..1adf55f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.local.*; @@ -47,6 +48,9 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class); suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class); + // Cluster wide metrics. + suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class); + return suite; } }