ignite-366 Metrics for caches should work in clustered mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b73fb49c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b73fb49c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b73fb49c Branch: refs/heads/ignite-366 Commit: b73fb49c3ebdf1c552b12e84e492a3db21e4d52a Parents: b4c51f4 Author: Andrey Gura <ag...@gridgain.com> Authored: Thu Apr 2 22:10:23 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Thu Apr 2 22:18:39 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 8 + .../org/apache/ignite/cluster/ClusterNode.java | 2 +- .../discovery/GridDiscoveryManager.java | 8 +- .../processors/cache/CacheMetricsSnapshot.java | 269 ++++++++++++++++++- .../processors/cache/IgniteCacheProxy.java | 15 ++ .../spi/discovery/DiscoveryMetricsProvider.java | 2 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 5 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 45 +++- .../tcp/internal/TcpDiscoveryNode.java | 6 +- .../messages/TcpDiscoveryHeartbeatMessage.java | 41 ++- .../ignite/internal/GridDiscoverySelfTest.java | 2 +- ...hePartitionedMetricsForClusterGroupTest.java | 72 +++++ .../ignite/p2p/GridP2PClassLoadingSelfTest.java | 2 +- .../ignite/testframework/GridTestNode.java | 4 +- .../junits/spi/GridSpiAbstractTest.java | 4 +- 15 files changed, 459 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 5c5bb25..d133ab7 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; @@ -514,6 +515,13 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public CacheMetrics metrics(); /** + * Gets snapshot metrics for caches in cluster group. + * @param grp Cluster group. + * @return Cache metrics. + */ + public CacheMetrics metrics(ClusterGroup grp); + + /** * Gets MxBean for this cache. * * @return MxBean. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index 1c2e829..bc1636e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -159,7 +159,7 @@ public interface ClusterNode { * * @return Runtime metrics snapshots for this node. */ - public Collection<CacheMetrics> cacheMetrics(); + public Map<String, CacheMetrics> cacheMetrics(); /** * Gets all node attributes. Attributes are assigned to nodes at startup http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 5fb8616..76afa73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -648,16 +648,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** {@inheritDoc} */ - @Override public Collection<CacheMetrics> cacheMetrics() { + @Override public Map<String, CacheMetrics> cacheMetrics() { Collection<GridCache<?, ?>> caches = ctx.cache().caches(); if (F.isEmpty(caches)) - return Collections.emptyList(); + return Collections.emptyMap(); - List<CacheMetrics> metrics = new ArrayList<>(caches.size()); + Map<String, CacheMetrics> metrics = U.newHashMap(caches.size()); for (GridCache<?, ?> cache : caches) - metrics.add(cache.metrics()); + metrics.put(cache.name(), cache.metrics()); return metrics; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index e20259f..4e6a256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; +import java.util.*; + /** * Metrics snapshot. */ -public class CacheMetricsSnapshot implements CacheMetrics { +public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { + /** Last update time. */ + private long lastUpdateTime = -1; + /** Number of reads. */ private long reads = 0; @@ -246,6 +252,131 @@ public class CacheMetricsSnapshot implements CacheMetrics { isWriteThrough = m.isWriteThrough(); } + /** + * Constructs merged cachemetrics. + * + * @param metrics Metrics for merge. + */ + public CacheMetricsSnapshot(Collection<CacheMetrics> metrics) { + A.notEmpty(metrics, "metrics"); + + // TODO: Only once + /* + cacheName + isEmpty + isWriteBehindEnabled + writeBehindFlushSize + writeBehindFlushThreadCount + writeBehindFlushFrequency + writeBehindStoreBatchSize + writeBehindTotalCriticalOverflowCount + writeBehindCriticalOverflowCount + writeBehindErrorRetryCount + writeBehindBufferSize + + keyType + valueType + isStoreByValue + isStatisticsEnabled + isManagementEnabled + isReadThrough + isWriteThrough + */ + + //TODO: How to merge? + /* + overflowSize + size (only once PRIMARY?) + */ + + boolean first = true; + + for (CacheMetrics e : metrics) { + + if (first) { + cacheName = e.name(); + isEmpty = e.isEmpty(); + isWriteBehindEnabled = e.isWriteBehindEnabled(); + writeBehindFlushSize = e.getWriteBehindFlushSize(); + writeBehindFlushThreadCount = e.getWriteBehindFlushThreadCount(); + writeBehindFlushFrequency = e.getWriteBehindFlushFrequency(); + writeBehindStoreBatchSize = e.getWriteBehindStoreBatchSize(); + writeBehindTotalCriticalOverflowCount = e.getWriteBehindTotalCriticalOverflowCount(); + writeBehindCriticalOverflowCount = e.getWriteBehindCriticalOverflowCount(); + writeBehindErrorRetryCount = e.getWriteBehindErrorRetryCount(); + writeBehindBufferSize = e.getWriteBehindBufferSize(); + + keyType = e.getKeyType(); + valueType = e.getValueType(); + isStoreByValue = e.isStoreByValue(); + isStatisticsEnabled = e.isStatisticsEnabled(); + isManagementEnabled = e.isManagementEnabled(); + isReadThrough = e.isReadThrough(); + isWriteThrough = e.isWriteThrough(); + + first = false; + } + + reads += e.getCacheGets(); + puts += e.getCachePuts(); + hits += e.getCacheHits(); + misses += e.getCacheHits(); + txCommits += e.getCacheTxCommits(); + txRollbacks += e.getCacheTxRollbacks(); + evicts += e.getCacheEvictions(); + removes += e.getCacheRemovals(); + + putAvgTimeNanos += e.getAveragePutTime(); + getAvgTimeNanos += e.getAverageGetTime(); + removeAvgTimeNanos += e.getAverageRemoveTime(); + commitAvgTimeNanos += e.getAverageTxCommitTime(); + rollbackAvgTimeNanos += e.getAverageTxRollbackTime(); + + offHeapEntriesCount += e.getOffHeapEntriesCount(); + offHeapAllocatedSize += e.getOffHeapAllocatedSize(); + keySize += e.getKeySize(); + dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + txThreadMapSize += e.getTxThreadMapSize(); + txXidMapSize += e.getTxXidMapSize(); + txCommitQueueSize += e.getTxCommitQueueSize(); + txPrepareQueueSize += e.getTxPrepareQueueSize(); + txStartVersionCountsSize += e.getTxStartVersionCountsSize(); + txCommittedVersionsSize += e.getTxCommittedVersionsSize(); + txRolledbackVersionsSize += e.getTxRolledbackVersionsSize(); + txDhtThreadMapSize += e.getTxDhtThreadMapSize(); + txDhtXidMapSize += e.getTxDhtXidMapSize(); + txDhtCommitQueueSize += e.getTxDhtCommitQueueSize(); + txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize(); + txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); + txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); + txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize(); + } + + putAvgTimeNanos /= metrics.size(); + getAvgTimeNanos /= metrics.size(); + removeAvgTimeNanos /= metrics.size(); + commitAvgTimeNanos /= metrics.size(); + rollbackAvgTimeNanos /= metrics.size(); + } + + /** + * Gets last update time of this node metrics. + * + * @return Last update time. + */ + public long getLastUpdateTime() { + return lastUpdateTime; + } + + /** + * Sets last update time. + * + * @param lastUpdateTime Last update time. + */ + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + /** {@inheritDoc} */ @Override public long getCacheHits() { return hits; @@ -266,9 +397,8 @@ public class CacheMetricsSnapshot implements CacheMetrics { /** {@inheritDoc} */ @Override public float getCacheMissPercentage() { - if (misses == 0 || reads == 0) { + if (misses == 0 || reads == 0) return 0; - } return (float) misses / reads * 100.0f; } @@ -283,6 +413,14 @@ public class CacheMetricsSnapshot implements CacheMetrics { return puts; } + /** + * Sets the total number of puts to the cache. + * @param puts The total number of puts to the cache. + */ + public void setCachePuts(long puts) { + this.puts = puts; + } + /** {@inheritDoc} */ @Override public long getCacheRemovals() { return removes; @@ -303,6 +441,15 @@ public class CacheMetricsSnapshot implements CacheMetrics { return putAvgTimeNanos; } + /** + * Sets the mean time to execute puts. + * + * @param putAvgTimeNanos The time in µs. + */ + public void setAveragePutTime(float putAvgTimeNanos) { + this.putAvgTimeNanos = putAvgTimeNanos; + } + /** {@inheritDoc} */ @Override public float getAverageRemoveTime() { return removeAvgTimeNanos; @@ -522,4 +669,120 @@ public class CacheMetricsSnapshot implements CacheMetrics { @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(reads); + out.writeLong(puts); + out.writeLong(hits); + out.writeLong(misses); + out.writeLong(txCommits); + out.writeLong(txRollbacks); + out.writeLong(evicts); + out.writeLong(removes); + + out.writeFloat(putAvgTimeNanos); + out.writeFloat(getAvgTimeNanos); + out.writeFloat(removeAvgTimeNanos); + out.writeFloat(commitAvgTimeNanos); + out.writeFloat(rollbackAvgTimeNanos); + + U.writeUTFStringNullable(out, cacheName); + out.writeLong(overflowSize); + out.writeLong(offHeapEntriesCount); + out.writeLong(offHeapAllocatedSize); + out.writeInt(size); + out.writeInt(keySize); + out.writeBoolean(isEmpty); + out.writeInt(dhtEvictQueueCurrentSize); + out.writeInt(txThreadMapSize); + out.writeInt(txXidMapSize); + out.writeInt(txCommitQueueSize); + out.writeInt(txPrepareQueueSize); + out.writeInt(txStartVersionCountsSize); + out.writeInt(txCommittedVersionsSize); + out.writeInt(txRolledbackVersionsSize); + out.writeInt(txDhtThreadMapSize); + out.writeInt(txDhtXidMapSize); + out.writeInt(txDhtCommitQueueSize); + out.writeInt(txDhtPrepareQueueSize); + out.writeInt(txDhtStartVersionCountsSize); + out.writeInt(txDhtCommittedVersionsSize); + out.writeInt(txDhtRolledbackVersionsSize); + out.writeBoolean(isWriteBehindEnabled); + out.writeInt(writeBehindFlushSize); + out.writeInt(writeBehindFlushThreadCount); + out.writeLong(writeBehindFlushFrequency); + out.writeInt(writeBehindStoreBatchSize); + out.writeInt(writeBehindTotalCriticalOverflowCount); + out.writeInt(writeBehindCriticalOverflowCount); + out.writeInt(writeBehindErrorRetryCount); + out.writeInt(writeBehindBufferSize); + + out.writeUTF(keyType); + out.writeUTF(valueType); + out.writeBoolean(isStoreByValue); + out.writeBoolean(isStatisticsEnabled); + out.writeBoolean(isManagementEnabled); + out.writeBoolean(isReadThrough); + out.writeBoolean(isWriteThrough); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + reads = in.readLong(); + puts = in.readLong(); + hits = in.readLong(); + misses = in.readLong(); + txCommits = in.readLong(); + txRollbacks = in.readLong(); + evicts = in.readLong(); + removes = in.readLong(); + + putAvgTimeNanos = in.readFloat(); + getAvgTimeNanos = in.readFloat(); + removeAvgTimeNanos = in.readFloat(); + commitAvgTimeNanos = in.readFloat(); + rollbackAvgTimeNanos = in.readFloat(); + + cacheName = U.readUTFStringNullable(in); + overflowSize = in.readLong(); + offHeapEntriesCount = in.readLong(); + offHeapAllocatedSize = in.readLong(); + size = in.readInt(); + keySize = in.readInt(); + isEmpty = in.readBoolean(); + dhtEvictQueueCurrentSize = in.readInt(); + txThreadMapSize = in.readInt(); + txXidMapSize = in.readInt(); + txCommitQueueSize = in.readInt(); + txPrepareQueueSize = in.readInt(); + txStartVersionCountsSize = in.readInt(); + txCommittedVersionsSize = in.readInt(); + txRolledbackVersionsSize = in.readInt(); + txDhtThreadMapSize = in.readInt(); + txDhtXidMapSize = in.readInt(); + txDhtCommitQueueSize = in.readInt(); + txDhtPrepareQueueSize = in.readInt(); + txDhtStartVersionCountsSize = in.readInt(); + txDhtCommittedVersionsSize = in.readInt(); + txDhtRolledbackVersionsSize = in.readInt(); + isWriteBehindEnabled = in.readBoolean(); + writeBehindFlushSize = in.readInt(); + writeBehindFlushThreadCount = in.readInt(); + writeBehindFlushFrequency = in.readLong(); + writeBehindStoreBatchSize = in.readInt(); + writeBehindTotalCriticalOverflowCount = in.readInt(); + writeBehindCriticalOverflowCount = in.readInt(); + writeBehindErrorRetryCount = in.readInt(); + writeBehindBufferSize = in.readInt(); + + keyType = in.readUTF(); + valueType = in.readUTF(); + isStoreByValue = in.readBoolean(); + isStatisticsEnabled = in.readBoolean(); + isManagementEnabled = in.readBoolean(); + isReadThrough = in.readBoolean(); + isWriteThrough = in.readBoolean(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index dfc3ef4..6b7d709 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -140,6 +140,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); + + for (ClusterNode node : grp.nodes()) + metrics.add(node.cacheMetrics().get(getName())); + + if (F.isEmpty(metrics)) { + //TODO: what actually need to be returned? + return new CacheMetricsSnapshot(); + } + + return new CacheMetricsSnapshot(metrics); + } + + /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java index f3c98de..eb1deb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java @@ -45,5 +45,5 @@ public interface DiscoveryMetricsProvider { * * @return metrics data about all caches on local node. */ - public Collection<CacheMetrics> cacheMetrics(); + public Map<String, CacheMetrics> cacheMetrics(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index a3bff02..9cfee1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1113,7 +1113,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } if (msg.hasCacheMetrics()) { - for (Map.Entry<UUID,Collection<CacheMetrics>> e : msg.cacheMetrics().entrySet()) + for (Map.Entry<UUID, Map<String, CacheMetrics>> e : msg.cacheMetrics().entrySet()) updateCacheMetrics(e.getKey(), e.getValue(), tstamp); } } @@ -1185,7 +1185,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateCacheMetrics(UUID nodeId, Collection<CacheMetrics> cacheMetrics, long tstamp) { + private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> cacheMetrics, long tstamp) { assert nodeId != null; assert cacheMetrics != null; @@ -1202,7 +1202,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp log.debug("Received cacheMetrics from unknown node: " + nodeId); } - /** * @param topVer New topology version. * @return Latest topology snapshot. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index d5c05f7..856ede2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.DiscoveryEvent; @@ -4351,7 +4352,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - if (locNodeId.equals(msg.creatorNodeId()) && !msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) { + if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) { if (log.isDebugEnabled()) log.debug("Discarding heartbeat message that has made two passes: " + msg); @@ -4371,14 +4372,21 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov updateMetrics(t.get1(), t.get2(), tstamp); } } + + if (msg.hasCacheMetrics()) { + for (Map.Entry<UUID, Map<String, CacheMetrics>> e : msg.cacheMetrics().entrySet()) + updateCacheMetrics(e.getKey(), e.getValue(), tstamp); + } } if (ring.hasRemoteNodes()) { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || - !msg.hasMetrics(locNodeId)) && spiStateCopy() == CONNECTED) { + !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. msg.setMetrics(locNodeId, metricsProvider.metrics()); + msg.setCacheMetrics(locNodeId, metricsProvider.cacheMetrics()); + for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); ClusterMetrics metrics = e.getValue().metrics(); @@ -4393,6 +4401,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov // Message is on its second ring. msg.removeMetrics(locNodeId); + msg.removeCacheMetrics(locNodeId); + Collection<UUID> clientNodeIds = msg.clientNodeIds(); for (TcpDiscoveryNode clientNode : ring.clientNodes()) { @@ -4444,6 +4454,35 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** + * @param nodeId Node ID. + * @param cacheMetrics Cache metrics. + * @param tstamp Timestamp. + */ + private void updateCacheMetrics(UUID nodeId, Map<String, CacheMetrics> cacheMetrics, long tstamp) { + assert nodeId != null; + assert cacheMetrics != null; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node != null && node.visible()) { + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); + } + else if (log.isDebugEnabled()) + log.debug("Received cacheMetrics from unknown node: " + nodeId); + } + + /** + * @param msg Message. + */ + private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); + } + + /** * Processes discard message and discards previously registered pending messages. * * @param msg Discard message. @@ -5226,8 +5265,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov metrics = hbMsg.metrics().get(nodeId).metrics(); hbMsg.removeMetrics(nodeId); + hbMsg.removeCacheMetrics(nodeId); assert !hbMsg.hasMetrics(); + assert !hbMsg.hasCacheMetrics(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 89924fb..7c4d17b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -76,7 +76,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** Node cache metrics. */ @GridToStringExclude - private volatile Collection<CacheMetrics> cacheMetrics; + private volatile Map<String, CacheMetrics> cacheMetrics; /** Node order in the topology. */ private volatile long order; @@ -215,7 +215,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** {@inheritDoc} */ - @Override public Collection<CacheMetrics> cacheMetrics() { + @Override public Map<String, CacheMetrics> cacheMetrics() { if (metricsProvider != null) cacheMetrics = metricsProvider.cacheMetrics(); @@ -227,7 +227,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * * @param cacheMetrics Cache metrics. */ - public void setCacheMetrics(Collection<CacheMetrics> cacheMetrics) { + public void setCacheMetrics(Map<String, CacheMetrics> cacheMetrics) { assert cacheMetrics != null; this.cacheMetrics = cacheMetrics; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 095edb7..c1e434e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -55,7 +56,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** Cahce metrics by node. */ @GridToStringExclude - private Map<UUID, Collection<CacheMetrics>> cacheMetrics; + private Map<UUID, Map<String, CacheMetrics>> cacheMetrics; /** * Public default no-arg constructor for {@link Externalizable} interface. @@ -97,7 +98,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { * @param nodeId Node ID. * @param metrics Node cache metrics. */ - public void setCacheMetrics(UUID nodeId, Collection<CacheMetrics> metrics) { + public void setCacheMetrics(UUID nodeId, Map<String, CacheMetrics> metrics) { assert nodeId != null; assert metrics != null; assert !this.cacheMetrics.containsKey(nodeId); @@ -157,7 +158,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { * * @return Cache metrics map. */ - public Map<UUID, Collection<CacheMetrics>> cacheMetrics() { + public Map<UUID, Map<String, CacheMetrics>> cacheMetrics() { return cacheMetrics; } @@ -226,6 +227,21 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } } + out.writeInt(cacheMetrics.size()); + + if (!cacheMetrics.isEmpty()) { + for (Map.Entry<UUID, Map<String, CacheMetrics>> e : cacheMetrics.entrySet()) { + U.writeUuid(out, e.getKey()); + + Map<String, CacheMetrics> ms = e.getValue(); + + out.writeInt(ms == null ? 0 : ms.size()); + + for (Map.Entry<String, CacheMetrics> m : ms.entrySet()) + out.writeObject(m.getValue()); + } + } + U.writeCollection(out, clientNodeIds); } @@ -240,6 +256,25 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { for (int i = 0; i < metricsSize; i++) metrics.put(U.readUuid(in), (MetricsSet)in.readObject()); + int cacheMetricsSize = in.readInt(); + + cacheMetrics = U.newHashMap(cacheMetricsSize); + + for (int i = 0; i < cacheMetricsSize; i++) { + UUID uuid = U.readUuid(in); + + int size = in.readInt(); + + Map<String, CacheMetrics> ms = U.newHashMap(size); + + for (int j = 0; j < size; j++) { + CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject(); + ms.put(m.name(), m); + } + + cacheMetrics.put(uuid, ms); + } + clientNodeIds = U.readCollection(in); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java index 3c2ccbb..5f47d5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -376,7 +376,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public Collection<CacheMetrics> cacheMetrics() { + @Override public Map<String, CacheMetrics> cacheMetrics() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java new file mode 100644 index 0000000..dce9dda --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedMetricsForClusterGroupTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCachePartitionedMetricsForClusterGroupTest extends GridCacheAbstractMetricsSelfTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + public static final String CACHE_1 = "cache1"; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setBackups(0); + + return ccfg; + } + + public void testMetrics() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 1000; i++) + cache.put(i, i); + + for (int i = 0; i < GRID_CNT; i++) { + IgniteCache<Integer, Integer> c = grid(i).cache(null); + System.out.println("!!! Grid(" + i + "), puts = " + c.metrics().getCachePuts() + ", size = " + c.size()); + } + + Thread.sleep(10000); + + CacheMetrics metrics = cache.metrics(grid(0).cluster().forRemotes()); + + System.out.println(metrics); +/* + assertEquals(1, cache.metrics().getCachePuts()); + assertEquals(0, grid(1).cache(null).metrics().getCachePuts()); + assertEquals(1, grid(2).cache(null).metrics().getCachePuts()); +*/ + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java index d9ffc8e..6622288 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java @@ -116,7 +116,7 @@ public class GridP2PClassLoadingSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public Collection<CacheMetrics> cacheMetrics() { + @Override public Map<String, CacheMetrics> cacheMetrics() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index e5eea88..aed66ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -57,7 +57,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private ClusterMetrics metrics; /** */ - private Collection<CacheMetrics> cacheMetrics = Collections.emptyList(); + private Map<String, CacheMetrics> cacheMetrics = Collections.emptyMap(); /** */ private long order; @@ -190,7 +190,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod return metrics; } - @Override public Collection<CacheMetrics> cacheMetrics() { + @Override public Map<String, CacheMetrics> cacheMetrics() { return cacheMetrics; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b73fb49c/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index 4d65587..2655b9d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -352,8 +352,8 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); } /** {@inheritDoc} */ - @Override public Collection<CacheMetrics> cacheMetrics() { - return Collections.<CacheMetrics>singletonList(new CacheMetricsSnapshot()); + @Override public Map<String, CacheMetrics> cacheMetrics() { + return Collections.<String, CacheMetrics>singletonMap(null, new CacheMetricsSnapshot()); } }; }