ignite-366 Metrics for caches should work in clustered mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5eeefd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5eeefd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5eeefd7 Branch: refs/heads/ignite-366 Commit: e5eeefd7fe3966a1cf162dc2d4bbf84831113600 Parents: cc44804 Author: Andrey Gura <[email protected]> Authored: Fri Apr 3 20:10:29 2015 +0300 Committer: Andrey Gura <[email protected]> Committed: Fri Apr 3 20:10:29 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 4 +- .../org/apache/ignite/cache/CacheMetrics.java | 7 + .../org/apache/ignite/cluster/ClusterNode.java | 2 +- .../discovery/GridDiscoveryManager.java | 10 +- .../processors/cache/CacheMetricsImpl.java | 10 +- .../cache/CacheMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheMetricsSnapshot.java | 230 +++++++++---------- .../processors/cache/IgniteCacheProxy.java | 20 +- .../spi/discovery/DiscoveryMetricsProvider.java | 2 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +- .../tcp/internal/TcpDiscoveryNode.java | 6 +- .../messages/TcpDiscoveryHeartbeatMessage.java | 16 +- .../ignite/internal/GridDiscoverySelfTest.java | 2 +- .../CacheMetricsForClusterGroupSelfTest.java | 154 +++++++++++++ ...hePartitionedMetricsForClusterGroupTest.java | 72 ------ .../ignite/p2p/GridP2PClassLoadingSelfTest.java | 2 +- .../ignite/testframework/GridTestNode.java | 4 +- .../junits/spi/GridSpiAbstractTest.java | 4 +- .../IgniteCacheMetricsSelfTestSuite.java | 4 + 20 files changed, 321 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index d133ab7..e7fc1cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -517,9 +517,9 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS /** * Gets snapshot metrics for caches in cluster group. * @param grp Cluster group. - * @return Cache metrics. + * @return Cache metrics. {@code Null} if cluster group is empty. */ - public CacheMetrics metrics(ClusterGroup grp); + @Nullable public CacheMetrics metrics(ClusterGroup grp); /** * Gets MxBean for this cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 0d87326..dad0ddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -147,6 +147,13 @@ public interface CacheMetrics { public String name(); /** + * Gets ID of this cache. + * + * @return Cache ID. + */ + public int id(); + + /** * Gets number of entries that was swapped to disk. * * @return Number of entries that was swapped to disk. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index bc1636e..3b38ec6 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -159,7 +159,7 @@ public interface ClusterNode { * * @return Runtime metrics snapshots for this node. */ - public Map<String, CacheMetrics> cacheMetrics(); + public Map<Integer, CacheMetrics> cacheMetrics(); /** * Gets all node attributes. Attributes are assigned to nodes at startup http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 76afa73..15b6ba9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -648,16 +648,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** {@inheritDoc} */ - @Override public Map<String, CacheMetrics> cacheMetrics() { - Collection<GridCache<?, ?>> caches = ctx.cache().caches(); + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + Collection<GridCacheAdapter<?, ?>> caches = ctx.cache().internalCaches(); if (F.isEmpty(caches)) return Collections.emptyMap(); - Map<String, CacheMetrics> metrics = U.newHashMap(caches.size()); + Map<Integer, CacheMetrics> metrics = U.newHashMap(caches.size()); - for (GridCache<?, ?> cache : caches) - metrics.put(cache.name(), cache.metrics()); + for (GridCacheAdapter<?, ?> cache : caches) + metrics.put(cache.context().cacheId(), cache.metrics()); return metrics; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index deebab4..8d9d02b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -110,6 +110,10 @@ public class CacheMetricsImpl implements CacheMetrics { this.delegate = delegate; } + /** {@inheritDoc} */ + @Override public int id() { + return cctx.cacheId(); + } /** {@inheritDoc} */ @Override public String name() { @@ -353,9 +357,8 @@ public class CacheMetricsImpl implements CacheMetrics { long misses0 = misses.get(); long reads0 = reads.get(); - if (misses0 == 0) { + if (misses0 == 0) return 0; - } return (float) misses0 / reads0 * 100.0f; } @@ -468,9 +471,8 @@ public class CacheMetricsImpl implements CacheMetrics { txCommits.incrementAndGet(); commitTimeNanos.addAndGet(duration); - if (delegate != null) { + if (delegate != null) delegate.onTxCommit(duration); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java index e9d547c..3dd206b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java @@ -39,6 +39,11 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public int id() { + return cache.context().cacheId(); + } + + /** {@inheritDoc} */ @Override public String name() { return cache.metrics0().name(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 4e6a256..b8db5b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -27,8 +27,8 @@ import java.util.*; * Metrics snapshot. */ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { - /** Last update time. */ - private long lastUpdateTime = -1; + /** */ + private static final long serialVersionUID = 0L; /** Number of reads. */ private long reads = 0; @@ -69,6 +69,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Commit transaction time taken nanos. */ private float rollbackAvgTimeNanos = 0; + /** Cache ID. */ + private int id; + /** Cache name */ private String cacheName; @@ -211,6 +214,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { commitAvgTimeNanos = m.getAverageTxCommitTime(); rollbackAvgTimeNanos = m.getAverageTxRollbackTime(); + id = m.id(); cacheName = m.name(); overflowSize = m.getOverflowSize(); offHeapEntriesCount = m.getOffHeapEntriesCount(); @@ -253,70 +257,35 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** - * Constructs merged cachemetrics. + * Constructs merged cache metrics. * + * @param loc Metrics for cache on local node. * @param metrics Metrics for merge. */ - public CacheMetricsSnapshot(Collection<CacheMetrics> metrics) { + public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics) { A.notEmpty(metrics, "metrics"); - // TODO: Only once - /* - cacheName - isEmpty - isWriteBehindEnabled - writeBehindFlushSize - writeBehindFlushThreadCount - writeBehindFlushFrequency - writeBehindStoreBatchSize - writeBehindTotalCriticalOverflowCount - writeBehindCriticalOverflowCount - writeBehindErrorRetryCount - writeBehindBufferSize - - keyType - valueType - isStoreByValue - isStatisticsEnabled - isManagementEnabled - isReadThrough - isWriteThrough - */ - - //TODO: How to merge? - /* - overflowSize - size (only once PRIMARY?) - */ - - boolean first = true; + id = loc.id(); + cacheName = loc.name(); + isEmpty = loc.isEmpty(); + isWriteBehindEnabled = loc.isWriteBehindEnabled(); + writeBehindFlushSize = loc.getWriteBehindFlushSize(); + writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount(); + writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency(); + writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize(); + writeBehindBufferSize = loc.getWriteBehindBufferSize(); + size = loc.getSize(); + keySize = loc.getKeySize(); + + keyType = loc.getKeyType(); + valueType = loc.getValueType(); + isStoreByValue = loc.isStoreByValue(); + isStatisticsEnabled = loc.isStatisticsEnabled(); + isManagementEnabled = loc.isManagementEnabled(); + isReadThrough = loc.isReadThrough(); + isWriteThrough = loc.isWriteThrough(); for (CacheMetrics e : metrics) { - - if (first) { - cacheName = e.name(); - isEmpty = e.isEmpty(); - isWriteBehindEnabled = e.isWriteBehindEnabled(); - writeBehindFlushSize = e.getWriteBehindFlushSize(); - writeBehindFlushThreadCount = e.getWriteBehindFlushThreadCount(); - writeBehindFlushFrequency = e.getWriteBehindFlushFrequency(); - writeBehindStoreBatchSize = e.getWriteBehindStoreBatchSize(); - writeBehindTotalCriticalOverflowCount = e.getWriteBehindTotalCriticalOverflowCount(); - writeBehindCriticalOverflowCount = e.getWriteBehindCriticalOverflowCount(); - writeBehindErrorRetryCount = e.getWriteBehindErrorRetryCount(); - writeBehindBufferSize = e.getWriteBehindBufferSize(); - - keyType = e.getKeyType(); - valueType = e.getValueType(); - isStoreByValue = e.isStoreByValue(); - isStatisticsEnabled = e.isStatisticsEnabled(); - isManagementEnabled = e.isManagementEnabled(); - isReadThrough = e.isReadThrough(); - isWriteThrough = e.isWriteThrough(); - - first = false; - } - reads += e.getCacheGets(); puts += e.getCachePuts(); hits += e.getCacheHits(); @@ -332,10 +301,17 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { commitAvgTimeNanos += e.getAverageTxCommitTime(); rollbackAvgTimeNanos += e.getAverageTxRollbackTime(); + if (e.getOverflowSize() > -1) + overflowSize += e.getOverflowSize(); + offHeapEntriesCount += e.getOffHeapEntriesCount(); offHeapAllocatedSize += e.getOffHeapAllocatedSize(); - keySize += e.getKeySize(); - dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + + if (e.getDhtEvictQueueCurrentSize() > -1) + dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + else + dhtEvictQueueCurrentSize = -1; + txThreadMapSize += e.getTxThreadMapSize(); txXidMapSize += e.getTxXidMapSize(); txCommitQueueSize += e.getTxCommitQueueSize(); @@ -343,38 +319,67 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { txStartVersionCountsSize += e.getTxStartVersionCountsSize(); txCommittedVersionsSize += e.getTxCommittedVersionsSize(); txRolledbackVersionsSize += e.getTxRolledbackVersionsSize(); - txDhtThreadMapSize += e.getTxDhtThreadMapSize(); - txDhtXidMapSize += e.getTxDhtXidMapSize(); - txDhtCommitQueueSize += e.getTxDhtCommitQueueSize(); - txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize(); - txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); - txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); - txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize(); - } - putAvgTimeNanos /= metrics.size(); - getAvgTimeNanos /= metrics.size(); - removeAvgTimeNanos /= metrics.size(); - commitAvgTimeNanos /= metrics.size(); - rollbackAvgTimeNanos /= metrics.size(); - } + if (e.getTxDhtThreadMapSize() > -1) + txDhtThreadMapSize += e.getTxDhtThreadMapSize(); + else + txDhtThreadMapSize = -1; + + if (e.getTxDhtXidMapSize() > -1) + txDhtXidMapSize += e.getTxDhtXidMapSize(); + else + txDhtXidMapSize = -1; + + if (e.getTxDhtCommitQueueSize() > -1) + txDhtCommitQueueSize += e.getTxDhtCommitQueueSize(); + else + txDhtCommitQueueSize = -1; + + if (e.getTxDhtPrepareQueueSize() > -1) + txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize(); + else + txDhtPrepareQueueSize = -1; + + if (e.getTxDhtStartVersionCountsSize() > -1) + txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); + else + txDhtStartVersionCountsSize = -1; + + if (e.getTxDhtCommittedVersionsSize() > -1) + txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); + else + txDhtCommittedVersionsSize = -1; + + if (e.getTxDhtRolledbackVersionsSize() > -1) + txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize(); + else + txDhtRolledbackVersionsSize = -1; + + if (e.getWriteBehindTotalCriticalOverflowCount() > -1) + writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount(); + else + writeBehindTotalCriticalOverflowCount = -1; + + if (e.getWriteBehindCriticalOverflowCount() > -1) + writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount(); + else + writeBehindCriticalOverflowCount = -1; + + if (e.getWriteBehindErrorRetryCount() > -1) + writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount(); + else + writeBehindErrorRetryCount = -1; + } - /** - * Gets last update time of this node metrics. - * - * @return Last update time. - */ - public long getLastUpdateTime() { - return lastUpdateTime; - } + int size = metrics.size(); - /** - * Sets last update time. - * - * @param lastUpdateTime Last update time. - */ - public void setLastUpdateTime(long lastUpdateTime) { - this.lastUpdateTime = lastUpdateTime; + if (size > 1) { + putAvgTimeNanos /= size; + getAvgTimeNanos /= size; + removeAvgTimeNanos /= size; + commitAvgTimeNanos /= size; + rollbackAvgTimeNanos /= size; + } } /** {@inheritDoc} */ @@ -481,6 +486,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public int id() { + return id; + } + + /** {@inheritDoc} */ @Override public long getOverflowSize() { return overflowSize; } @@ -672,6 +682,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + out.writeLong(reads); out.writeLong(puts); out.writeLong(hits); @@ -687,13 +699,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { out.writeFloat(commitAvgTimeNanos); out.writeFloat(rollbackAvgTimeNanos); - U.writeUTFStringNullable(out, cacheName); out.writeLong(overflowSize); out.writeLong(offHeapEntriesCount); out.writeLong(offHeapAllocatedSize); - out.writeInt(size); - out.writeInt(keySize); - out.writeBoolean(isEmpty); out.writeInt(dhtEvictQueueCurrentSize); out.writeInt(txThreadMapSize); out.writeInt(txXidMapSize); @@ -709,27 +717,15 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { out.writeInt(txDhtStartVersionCountsSize); out.writeInt(txDhtCommittedVersionsSize); out.writeInt(txDhtRolledbackVersionsSize); - out.writeBoolean(isWriteBehindEnabled); - out.writeInt(writeBehindFlushSize); - out.writeInt(writeBehindFlushThreadCount); - out.writeLong(writeBehindFlushFrequency); - out.writeInt(writeBehindStoreBatchSize); out.writeInt(writeBehindTotalCriticalOverflowCount); out.writeInt(writeBehindCriticalOverflowCount); out.writeInt(writeBehindErrorRetryCount); - out.writeInt(writeBehindBufferSize); - - out.writeUTF(keyType); - out.writeUTF(valueType); - out.writeBoolean(isStoreByValue); - out.writeBoolean(isStatisticsEnabled); - out.writeBoolean(isManagementEnabled); - out.writeBoolean(isReadThrough); - out.writeBoolean(isWriteThrough); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + reads = in.readLong(); puts = in.readLong(); hits = in.readLong(); @@ -745,13 +741,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { commitAvgTimeNanos = in.readFloat(); rollbackAvgTimeNanos = in.readFloat(); - cacheName = U.readUTFStringNullable(in); overflowSize = in.readLong(); offHeapEntriesCount = in.readLong(); offHeapAllocatedSize = in.readLong(); - size = in.readInt(); - keySize = in.readInt(); - isEmpty = in.readBoolean(); dhtEvictQueueCurrentSize = in.readInt(); txThreadMapSize = in.readInt(); txXidMapSize = in.readInt(); @@ -767,22 +759,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { txDhtStartVersionCountsSize = in.readInt(); txDhtCommittedVersionsSize = in.readInt(); txDhtRolledbackVersionsSize = in.readInt(); - isWriteBehindEnabled = in.readBoolean(); - writeBehindFlushSize = in.readInt(); - writeBehindFlushThreadCount = in.readInt(); - writeBehindFlushFrequency = in.readLong(); - writeBehindStoreBatchSize = in.readInt(); writeBehindTotalCriticalOverflowCount = in.readInt(); writeBehindCriticalOverflowCount = in.readInt(); writeBehindErrorRetryCount = in.readInt(); - writeBehindBufferSize = in.readInt(); - - keyType = in.readUTF(); - valueType = in.readUTF(); - isStoreByValue = in.readBoolean(); - isStatisticsEnabled = in.readBoolean(); - isManagementEnabled = in.readBoolean(); - isReadThrough = in.readBoolean(); - isWriteThrough = in.readBoolean(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 6b7d709..c2d7ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -140,18 +140,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public CacheMetrics metrics(ClusterGroup grp) { - List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); + @Nullable @Override public CacheMetrics metrics(ClusterGroup grp) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); - for (ClusterNode node : grp.nodes()) - metrics.add(node.cacheMetrics().get(getName())); + for (ClusterNode node : grp.nodes()) + metrics.add(node.cacheMetrics().get(context().cacheId())); - if (F.isEmpty(metrics)) { - //TODO: what actually need to be returned? - return new CacheMetricsSnapshot(); + return F.isEmpty(metrics) ? null : new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); + } + finally { + gate.leave(prev); } - - return new CacheMetricsSnapshot(metrics); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java index eb1deb9..c2bdc53 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java @@ -45,5 +45,5 @@ public interface DiscoveryMetricsProvider { * * @return metrics data about all caches on local node. */ - public Map<String, CacheMetrics> cacheMetrics(); + public Map<Integer, CacheMetrics> cacheMetrics(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 9cfee1a..a42730f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1113,7 +1113,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } if (msg.hasCacheMetrics()) { - for (Map.Entry<UUID, Map<String, CacheMetrics>> e : msg.cacheMetrics().entrySet()) + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : msg.cacheMetrics().entrySet()) updateCacheMetrics(e.getKey(), e.getValue(), tstamp); } } @@ -1185,7 +1185,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> cacheMetrics, long tstamp) { + private void updateCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> cacheMetrics, long tstamp) { assert nodeId != null; assert cacheMetrics != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 856ede2..1b0bf64 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -4374,7 +4374,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } if (msg.hasCacheMetrics()) { - for (Map.Entry<UUID, Map<String, CacheMetrics>> e : msg.cacheMetrics().entrySet()) + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : msg.cacheMetrics().entrySet()) updateCacheMetrics(e.getKey(), e.getValue(), tstamp); } } @@ -4458,7 +4458,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> cacheMetrics, long tstamp) { + private void updateCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> cacheMetrics, long tstamp) { assert nodeId != null; assert cacheMetrics != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 7c4d17b..3cafd4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -76,7 +76,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** Node cache metrics. */ @GridToStringExclude - private volatile Map<String, CacheMetrics> cacheMetrics; + private volatile Map<Integer, CacheMetrics> cacheMetrics; /** Node order in the topology. */ private volatile long order; @@ -215,7 +215,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** {@inheritDoc} */ - @Override public Map<String, CacheMetrics> cacheMetrics() { + @Override public Map<Integer, CacheMetrics> cacheMetrics() { if (metricsProvider != null) cacheMetrics = metricsProvider.cacheMetrics(); @@ -227,7 +227,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * * @param cacheMetrics Cache metrics. */ - public void setCacheMetrics(Map<String, CacheMetrics> cacheMetrics) { + public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) { assert cacheMetrics != null; this.cacheMetrics = cacheMetrics; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index c1e434e..ee81f45 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -56,7 +56,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** Cahce metrics by node. */ @GridToStringExclude - private Map<UUID, Map<String, CacheMetrics>> cacheMetrics; + private Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics; /** * Public default no-arg constructor for {@link Externalizable} interface. @@ -98,7 +98,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { * @param nodeId Node ID. * @param metrics Node cache metrics. */ - public void setCacheMetrics(UUID nodeId, Map<String, CacheMetrics> metrics) { + public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> metrics) { assert nodeId != null; assert metrics != null; assert !this.cacheMetrics.containsKey(nodeId); @@ -158,7 +158,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { * * @return Cache metrics map. */ - public Map<UUID, Map<String, CacheMetrics>> cacheMetrics() { + public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() { return cacheMetrics; } @@ -230,14 +230,14 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { out.writeInt(cacheMetrics.size()); if (!cacheMetrics.isEmpty()) { - for (Map.Entry<UUID, Map<String, CacheMetrics>> e : cacheMetrics.entrySet()) { + for (Map.Entry<UUID, Map<Integer, CacheMetrics>> e : cacheMetrics.entrySet()) { U.writeUuid(out, e.getKey()); - Map<String, CacheMetrics> ms = e.getValue(); + Map<Integer, CacheMetrics> ms = e.getValue(); out.writeInt(ms == null ? 0 : ms.size()); - for (Map.Entry<String, CacheMetrics> m : ms.entrySet()) + for (Map.Entry<Integer, CacheMetrics> m : ms.entrySet()) out.writeObject(m.getValue()); } } @@ -265,11 +265,11 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { int size = in.readInt(); - Map<String, CacheMetrics> ms = U.newHashMap(size); + Map<Integer, CacheMetrics> ms = U.newHashMap(size); for (int j = 0; j < size; j++) { CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject(); - ms.put(m.name(), m); + ms.put(m.id(), m); } cacheMetrics.put(uuid, ms); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java index 5f47d5e..5025743 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -376,7 +376,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public Map<String, CacheMetrics> cacheMetrics() { + @Override public Map<Integer, CacheMetrics> cacheMetrics() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java new file mode 100644 index 0000000..a3ffca9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; + +/** + * Test for cluster wide cache metrics. + */ +public class CacheMetricsForClusterGroupSelfTest extends GridCacheAbstractSelfTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Cache 1. */ + private static final String CACHE1 = "cache1"; + + /** Cache 2. */ + private static final String CACHE2 = "cache2"; + + /** Entry count cache 1. */ + private static final int ENTRY_CNT_CACHE1 = 1000; + + /** Entry count cache 2. */ + private static final int ENTRY_CNT_CACHE2 = 500; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + CacheConfiguration ccfg1 = defaultCacheConfiguration(); + ccfg1.setName(CACHE1); + ccfg1.setStatisticsEnabled(true); + + CacheConfiguration ccfg2 = defaultCacheConfiguration(); + ccfg2.setName(CACHE2); + ccfg2.setStatisticsEnabled(true); + + grid(0).getOrCreateCache(ccfg1); + grid(0).getOrCreateCache(ccfg2); + } + + /** + * Test cluster group metrics. + */ + public void testMetrics() throws Exception { + populateCacheData(CACHE1, ENTRY_CNT_CACHE1); + populateCacheData(CACHE2, ENTRY_CNT_CACHE2); + + readCacheData(CACHE1, ENTRY_CNT_CACHE1); + readCacheData(CACHE2, ENTRY_CNT_CACHE2); + + // Wait for heartbeat message + Thread.sleep(3000); + + assertMetrics(CACHE1); + assertMetrics(CACHE2); + } + + /** + * @param name Name. + * @param cnt Count. + */ + private void populateCacheData(String name, int cnt) { + IgniteCache<Integer, Integer> cache = grid(0).cache(name); + + for (int i = 0; i < cnt; i++) + cache.put(i, i); + } + + /** + * @param name Name. + * @param cnt Count. + */ + private void readCacheData(String name, int cnt) { + IgniteCache<Integer, Integer> cache = grid(0).cache(name); + + for (int i = 0; i < cnt; i++) + cache.get(i); + } + + /** + * @param name Name. + */ + private void assertMetrics(String name) { + CacheMetrics metrics = grid(0).cache(name).metrics(grid(0).cluster().forCacheNodes(name)); + + CacheMetrics[] ms = new CacheMetrics[gridCount()]; + + for (int i = 0; i < gridCount(); i++) + ms[i] = grid(i).cache(name).metrics(); + + // Static metrics + for (int i = 0; i < gridCount(); i++) + assertEquals(metrics.id(), ms[i].id()); + + for (int i = 0; i < gridCount(); i++) + assertEquals(metrics.name(), ms[i].name()); + + // Dynamic metrics + assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheGets(); + } + })); + + assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCachePuts(); + } + })); + + assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure<CacheMetrics, Long>() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheHits(); + } + })); + } + + /** + * @param ms Milliseconds. + * @param f Function. + */ + private long sum(CacheMetrics[] ms, IgniteClosure<CacheMetrics, Long> f) { + long res = 0; + + for (int i = 0; i < gridCount(); i++) + res += f.apply(ms[i]); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java deleted file mode 100644 index dce9dda..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.processors.cache.*; - -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheRebalanceMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * - */ -public class GridCachePartitionedMetricsForClusterGroupTest extends GridCacheAbstractMetricsSelfTest { - /** Grid count. */ - private static final int GRID_CNT = 3; - public static final String CACHE_1 = "cache1"; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { - CacheConfiguration ccfg = super.cacheConfiguration(gridName); - - ccfg.setBackups(0); - - return ccfg; - } - - public void testMetrics() throws Exception { - IgniteCache<Integer, Integer> cache = grid(0).cache(null); - - for (int i = 0; i < 1000; i++) - cache.put(i, i); - - for (int i = 0; i < GRID_CNT; i++) { - IgniteCache<Integer, Integer> c = grid(i).cache(null); - System.out.println("!!! Grid(" + i + "), puts = " + c.metrics().getCachePuts() + ", size = " + c.size()); - } - - Thread.sleep(10000); - - CacheMetrics metrics = cache.metrics(grid(0).cluster().forRemotes()); - - System.out.println(metrics); -/* - assertEquals(1, cache.metrics().getCachePuts()); - assertEquals(0, grid(1).cache(null).metrics().getCachePuts()); - assertEquals(1, grid(2).cache(null).metrics().getCachePuts()); -*/ - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java index 6622288..bd8748e 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java @@ -116,7 +116,7 @@ public class GridP2PClassLoadingSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public Map<String, CacheMetrics> cacheMetrics() { + @Override public Map<Integer, CacheMetrics> cacheMetrics() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index aed66ad..a77fbfa 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -57,7 +57,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private ClusterMetrics metrics; /** */ - private Map<String, CacheMetrics> cacheMetrics = Collections.emptyMap(); + private Map<Integer, CacheMetrics> cacheMetrics = Collections.emptyMap(); /** */ private long order; @@ -190,7 +190,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod return metrics; } - @Override public Map<String, CacheMetrics> cacheMetrics() { + @Override public Map<Integer, CacheMetrics> cacheMetrics() { return cacheMetrics; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index 2655b9d..08bb1ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -352,8 +352,8 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); } /** {@inheritDoc} */ - @Override public Map<String, CacheMetrics> cacheMetrics() { - return Collections.<String, CacheMetrics>singletonMap(null, new CacheMetricsSnapshot()); + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + return Collections.<Integer, CacheMetrics>singletonMap(null, new CacheMetricsSnapshot()); } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5eeefd7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index 511afec..1adf55f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.local.*; @@ -47,6 +48,9 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class); suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class); + // Cluster wide metrics. + suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class); + return suite; } }
