ignite-366 Metrics for caches should work in clustered mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b4c51f47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b4c51f47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b4c51f47 Branch: refs/heads/ignite-366 Commit: b4c51f47543bbfd663b956eab124541446072eb0 Parents: 7e86251 Author: Andrey Gura <ag...@gridgain.com> Authored: Wed Apr 1 21:02:20 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Thu Apr 2 22:18:39 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cluster/ClusterNode.java | 14 +++++ .../discovery/GridDiscoveryManager.java | 17 ++++++ .../processors/cache/CacheMetricsSnapshot.java | 9 ++- .../spi/discovery/DiscoveryMetricsProvider.java | 10 ++++ .../discovery/tcp/TcpClientDiscoverySpi.java | 38 ++++++++++++- .../tcp/internal/TcpDiscoveryNode.java | 24 ++++++++ .../messages/TcpDiscoveryHeartbeatMessage.java | 58 ++++++++++++++++++++ .../ignite/internal/GridDiscoverySelfTest.java | 5 ++ ...idCachePartitionedHitsAndMissesSelfTest.java | 9 ++- .../ignite/p2p/GridP2PClassLoadingSelfTest.java | 6 ++ .../ignite/testframework/GridTestNode.java | 8 +++ .../junits/spi/GridSpiAbstractTest.java | 7 +++ 12 files changed, 199 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index 9cb5d3d..1c2e829 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -18,6 +18,7 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -148,6 +149,19 @@ public interface ClusterNode { public ClusterMetrics metrics(); /** + * Gets collections of cachemetrics snapshot for this node. Note that node cache metrics are constantly updated + * and provide up to date information about caches. For example, you can get + * an idea about cache hits on remote node via {@link CacheMetrics#getCacheHits()} method. + * <p> + * Cache metrics are updated with some delay which is directly related to heartbeat + * frequency. For example, when used with default + * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds. + * + * @return Runtime metrics snapshots for this node. + */ + public Collection<CacheMetrics> cacheMetrics(); + + /** * Gets all node attributes. Attributes are assigned to nodes at startup * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 04ff423..5fb8616 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; @@ -644,6 +646,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return nm; } + + /** {@inheritDoc} */ + @Override public Collection<CacheMetrics> cacheMetrics() { + Collection<GridCache<?, ?>> caches = ctx.cache().caches(); + + if (F.isEmpty(caches)) + return Collections.emptyList(); + + List<CacheMetrics> metrics = new ArrayList<>(caches.size()); + + for (GridCache<?, ?> cache : caches) + metrics.add(cache.metrics()); + + return metrics; + } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 0391f4e..e20259f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; /** * Metrics snapshot. */ -class CacheMetricsSnapshot implements CacheMetrics { +public class CacheMetricsSnapshot implements CacheMetrics { /** Number of reads. */ private long reads = 0; @@ -178,6 +178,13 @@ class CacheMetricsSnapshot implements CacheMetrics { private boolean isWriteThrough; /** + * Default constructor. + */ + public CacheMetricsSnapshot() { + // No-op. + } + + /** * Create snapshot for given metrics. * * @param m Cache metrics. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java index 4a03278..f3c98de 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java @@ -17,9 +17,12 @@ package org.apache.ignite.spi.discovery; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.tostring.*; +import java.util.*; + /** * Provides metrics to discovery SPI. It is responsibility of discovery SPI * to make sure that all nodes have updated metrics data about each other. @@ -36,4 +39,11 @@ public interface DiscoveryMetricsProvider { * @return Up to date metrics data about local node. */ public ClusterMetrics metrics(); + + /** + * Returns metrics data about all caches on local node. + * + * @return metrics data about all caches on local node. + */ + public Collection<CacheMetrics> cacheMetrics(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 48125bf..a3bff02 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -1068,7 +1069,10 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Socket sock0 = sock; if (sock0 != null) { - msg.setMetrics(ignite.configuration().getNodeId(), metricsProvider.metrics()); + UUID nodeId = ignite.configuration().getNodeId(); + + msg.setMetrics(nodeId, metricsProvider.metrics()); + msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics()); try { writeToSocket(sock0, msg); @@ -1095,9 +1099,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp log.debug("Received heartbeat response: " + msg); } else { - if (msg.hasMetrics()) { - long tstamp = U.currentTimeMillis(); + long tstamp = U.currentTimeMillis(); + if (msg.hasMetrics()) { for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) { MetricsSet metricsSet = e.getValue(); @@ -1107,6 +1111,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp updateMetrics(t.get1(), t.get2(), tstamp); } } + + if (msg.hasCacheMetrics()) { + for (Map.Entry<UUID,Collection<CacheMetrics>> e : msg.cacheMetrics().entrySet()) + updateCacheMetrics(e.getKey(), e.getValue(), tstamp); + } } } @@ -1172,6 +1181,29 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * @param nodeId Node ID. + * @param cacheMetrics Cache metrics. + * @param tstamp Timestamp. + */ + private void updateCacheMetrics(UUID nodeId, Collection<CacheMetrics> cacheMetrics, long tstamp) { + assert nodeId != null; + assert cacheMetrics != null; + + TcpDiscoveryNode node = nodeId.equals(ignite.configuration().getNodeId()) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTime(tstamp); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); + } + else if (log.isDebugEnabled()) + log.debug("Received cacheMetrics from unknown node: " + nodeId); + } + + + /** * @param topVer New topology version. * @return Latest topology snapshot. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 450dd8c..89924fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.internal; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; @@ -73,6 +74,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @GridToStringExclude private volatile ClusterMetrics metrics; + /** Node cache metrics. */ + @GridToStringExclude + private volatile Collection<CacheMetrics> cacheMetrics; + /** Node order in the topology. */ private volatile long order; @@ -209,6 +214,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.metrics = metrics; } + /** {@inheritDoc} */ + @Override public Collection<CacheMetrics> cacheMetrics() { + if (metricsProvider != null) + cacheMetrics = metricsProvider.cacheMetrics(); + + return cacheMetrics; + } + + /** + * Sets node cache metrics. + * + * @param cacheMetrics Cache metrics. + */ + public void setCacheMetrics(Collection<CacheMetrics> cacheMetrics) { + assert cacheMetrics != null; + + this.cacheMetrics = cacheMetrics; + } + /** * @return Internal order. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 65eea9f..095edb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; @@ -52,6 +53,10 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** Client node IDs. */ private Collection<UUID> clientNodeIds; + /** Cahce metrics by node. */ + @GridToStringExclude + private Map<UUID, Collection<CacheMetrics>> cacheMetrics; + /** * Public default no-arg constructor for {@link Externalizable} interface. */ @@ -68,6 +73,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { super(creatorNodeId); metrics = U.newHashMap(1); + cacheMetrics = U.newHashMap(1); clientNodeIds = new HashSet<>(); } @@ -86,6 +92,20 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Sets cache metrics for particular node. + * + * @param nodeId Node ID. + * @param metrics Node cache metrics. + */ + public void setCacheMetrics(UUID nodeId, Collection<CacheMetrics> metrics) { + assert nodeId != null; + assert metrics != null; + assert !this.cacheMetrics.containsKey(nodeId); + + this.cacheMetrics.put(nodeId, metrics); + } + + /** * Sets metrics for a client node. * * @param nodeId Server node ID. @@ -113,6 +133,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Removes cache metrics for particular node from the message. + * + * @param nodeId Node ID. + */ + public void removeCacheMetrics(UUID nodeId) { + assert nodeId != null; + + cacheMetrics.remove(nodeId); + } + + /** * Gets metrics map. * * @return Metrics map. @@ -122,6 +153,15 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Gets cache metrics map. + * + * @return Cache metrics map. + */ + public Map<UUID, Collection<CacheMetrics>> cacheMetrics() { + return cacheMetrics; + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics() { @@ -129,6 +169,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @return {@code True} this message contains cache metrics. + */ + public boolean hasCacheMetrics() { + return !cacheMetrics.isEmpty(); + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics(UUID nodeId) { @@ -138,6 +185,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @param nodeId Node ID. + * + * @return {@code True} if this message contains cache metrics for particular node. + */ + public boolean hasCacheMetrics(UUID nodeId) { + assert nodeId != null; + + return cacheMetrics.get(nodeId) != null; + } + + /** * Gets client node IDs for particular node. * * @return Client node IDs. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java index 4a2483d..3c2ccbb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -376,6 +376,11 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public Collection<CacheMetrics> cacheMetrics() { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public Map<String, Object> attributes() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index f5eb189..0d6d30a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -115,13 +115,18 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac for (int i = 0; i < GRID_CNT; i++) { CacheMetrics m = grid(i).cache(null).metrics(); - hits += m.getCacheHits(); - misses += m.getCacheMisses(); + long cacheHits = m.getCacheHits(); + hits += cacheHits; + long cacheMisses = m.getCacheMisses(); + misses += cacheMisses; + + System.out.println("!!! Grid(" + i + "): hits = " + cacheHits + ", misses = " + cacheMisses); } // Check that invoke and loader updated metrics assertEquals(CNT, hits); assertEquals(CNT, misses); + System.out.println("!!! Grid: hits = " + hits + ", misses = " + hits); } finally { stopAllGrids(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java index 9700d94..d9ffc8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.p2p; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.util.lang.*; @@ -115,6 +116,11 @@ public class GridP2PClassLoadingSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public Collection<CacheMetrics> cacheMetrics() { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public Map<String, Object> attributes() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index 5de1f14..e5eea88 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -17,6 +17,7 @@ package org.apache.ignite.testframework; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; @@ -56,6 +57,9 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private ClusterMetrics metrics; /** */ + private Collection<CacheMetrics> cacheMetrics = Collections.emptyList(); + + /** */ private long order; /** */ @@ -186,6 +190,10 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod return metrics; } + @Override public Collection<CacheMetrics> cacheMetrics() { + return cacheMetrics; + } + /** {@inheritDoc} */ @Override public long order() { return order != 0 ? order : (metrics == null ? -1 : metrics.getStartTime()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c51f47/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index 7898c3d..4d65587 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -18,8 +18,10 @@ package org.apache.ignite.testframework.junits.spi; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; @@ -348,6 +350,11 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr return new DiscoveryMetricsProvider() { /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); } + + /** {@inheritDoc} */ + @Override public Collection<CacheMetrics> cacheMetrics() { + return Collections.<CacheMetrics>singletonList(new CacheMetricsSnapshot()); + } }; }