IGNITE-49 Removed DiscoveryMetricsHelper
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/abe5d178 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/abe5d178 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/abe5d178 Branch: refs/heads/ignite-6 Commit: abe5d178983c886c8feefe58daa7b33a30d495ce Parents: f1a8f16 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Tue Jan 27 15:55:11 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Tue Jan 27 15:55:11 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/ClusterGroupAdapter.java | 2 +- .../ignite/internal/ClusterMetricsImpl.java | 627 -------- .../ignite/internal/ClusterMetricsSnapshot.java | 1454 ++++++++++++++++++ .../spi/discovery/ClusterMetricsSnapshot.java | 1118 -------------- .../spi/discovery/DiscoveryMetricsHelper.java | 368 ----- .../tcp/internal/TcpDiscoveryNode.java | 6 +- .../messages/TcpDiscoveryHeartbeatMessage.java | 16 +- ...bStealingCollisionSpiAttributesSelfTest.java | 2 +- ...alingCollisionSpiCustomTopologySelfTest.java | 2 +- .../GridJobStealingCollisionSpiSelfTest.java | 2 +- ...ClusterMetricsSnapshotSerializeSelfTest.java | 126 ++ .../GridDiscoveryMetricsHelperSelfTest.java | 149 -- .../testframework/GridSpiTestContext.java | 2 +- .../testsuites/IgniteUtilSelfTestSuite.java | 2 +- .../util/GridTopologyHeapSizeSelfTest.java | 2 +- 15 files changed, 1597 insertions(+), 2281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abe5d178/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java index 36252b5..1086fbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java @@ -261,7 +261,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { if (nodes().isEmpty()) throw U.emptyTopologyException(); - return new ClusterMetricsImpl(this); + return new ClusterMetricsSnapshot(this); } finally { unguard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abe5d178/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsImpl.java deleted file mode 100644 index 113a1bc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsImpl.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static java.lang.Math.*; - -/** - * Implementation for {@link org.apache.ignite.cluster.ClusterMetrics ClusterNodeMetrics} interface. - */ -class ClusterMetricsImpl implements ClusterMetrics { - - /** */ - private int totalExecutedTask; - - /** */ - private int maxActJobs = Integer.MIN_VALUE; - - /** */ - private int curActJobs; - - /** */ - private int totalExecutedJobs; - - /** */ - private float avgActJobs; - - /** */ - private int curCancelJobs; - - /** */ - private int totalCancelJobs; - - /** */ - private int maxCancelJobs = Integer.MIN_VALUE; - - /** */ - private float avgCancelJobs; - - /** */ - private int curRejectJobs; - - /** */ - private int maxRejectJobs = Integer.MIN_VALUE; - - /** */ - private float avgRejectJobs; - - /** */ - private int totalRejectJobs; - - /** */ - private int curWaitJobs; - - /** */ - private int maxWaitJobs = Integer.MIN_VALUE; - - /** */ - private float avgWaitJobs; - - /** */ - private long maxJobExecTime = Long.MIN_VALUE; - - /** */ - private double avgJobExecTime; - - /** */ - private long minJobWaitTime = Long.MAX_VALUE; - - /** */ - private long maxJobWaitTime = Long.MIN_VALUE; - - /** */ - private double avgJobWaitTime; - - /** */ - private int curDaemonThreadCnt; - - /** */ - private int maxThreadCnt = Integer.MIN_VALUE; - - /** */ - private int curThreadCnt; - - /** */ - private int totalStartedThreadCnt; - - /** */ - private long minIdleTime = Long.MAX_VALUE; - - /** */ - private float avgIdleTimePercent; - - /** */ - private int totalIdleTime; - - /** */ - private float avgBusyTimePerc; - - /** */ - private int totalBusyTime; - - /** */ - private double avgCpuLoad; - - /** */ - private long totalHeapMemCmt = Long.MIN_VALUE; - - /** */ - private long totalHeapMemUsed; - - /** */ - private long maxHeapMemMax = Long.MIN_VALUE; - - /** */ - private long totalHeapMemInit; - - /** */ - private long totalNonHeapMemCmt; - - /** */ - private long totalNonHeapMemUsed; - - /** */ - private long maxNonHeapMemMax = Long.MIN_VALUE; - - /** */ - private long totalNonHeapMemInit; - - /** */ - private long oldestNodeStartTime = Long.MAX_VALUE; - - /** */ - private long maxUpTime = Long.MIN_VALUE; - - /** */ - private int totalCpus; - - /** */ - private int currentGcCpus; - - /** */ - private long lastUpdateTime; - - /** */ - private long lastDataVersion = Long.MIN_VALUE; - - /** */ - private int sentMessagesCnt; - - /** */ - private long sentBytesCnt; - - /** */ - private int receivedMessagesCnt; - - /** */ - private long receivedBytesCnt; - - /** */ - private int outboundMessagesQueueSize; - - /** - * @param p Projection to get metrics for. - */ - ClusterMetricsImpl(ClusterGroup p) { - assert p != null; - - Collection<ClusterNode> nodes = p.nodes(); - - int size = nodes.size(); - - for (ClusterNode node : nodes) { - ClusterMetrics m = node.metrics(); - - lastUpdateTime = max(lastUpdateTime, node.metrics().getLastUpdateTime()); - - curActJobs += m.getCurrentActiveJobs(); - maxActJobs = max(maxActJobs, m.getCurrentActiveJobs()); - avgActJobs += m.getCurrentActiveJobs(); - totalExecutedJobs += m.getTotalExecutedJobs(); - - totalExecutedTask += m.getTotalExecutedTasks(); - - totalCancelJobs += m.getTotalCancelledJobs(); - curCancelJobs += m.getCurrentCancelledJobs(); - maxCancelJobs = max(maxCancelJobs, m.getCurrentCancelledJobs()); - avgCancelJobs += m.getCurrentCancelledJobs(); - - totalRejectJobs += m.getTotalRejectedJobs(); - curRejectJobs += m.getCurrentRejectedJobs(); - totalRejectJobs += m.getTotalRejectedJobs(); - maxRejectJobs = max(maxRejectJobs, m.getCurrentRejectedJobs()); - avgRejectJobs += m.getCurrentRejectedJobs(); - - curWaitJobs += m.getCurrentJobWaitTime(); - maxWaitJobs = max(maxWaitJobs, m.getCurrentWaitingJobs()); - avgWaitJobs += m.getCurrentWaitingJobs(); - - maxJobExecTime = max(maxJobExecTime, m.getCurrentJobExecuteTime()); - avgJobExecTime += m.getCurrentJobExecuteTime(); - - minJobWaitTime = min(minJobWaitTime, m.getCurrentJobWaitTime()); - maxJobWaitTime = max(maxJobWaitTime, m.getCurrentJobWaitTime()); - avgJobWaitTime += m.getCurrentJobWaitTime(); - - curDaemonThreadCnt += m.getCurrentDaemonThreadCount(); - - maxThreadCnt = max(maxThreadCnt, m.getCurrentThreadCount()); - curThreadCnt += m.getCurrentThreadCount(); - totalStartedThreadCnt += m.getTotalStartedThreadCount(); - - minIdleTime = min(minIdleTime, m.getCurrentIdleTime()); - avgIdleTimePercent += m.getIdleTimePercentage(); - totalIdleTime += m.getTotalIdleTime(); - - avgBusyTimePerc += m.getBusyTimePercentage(); - totalBusyTime += m.getTotalBusyTime(); - - avgCpuLoad += m.getCurrentCpuLoad(); - - totalHeapMemCmt += m.getHeapMemoryCommitted(); - - totalHeapMemUsed += m.getHeapMemoryUsed(); - - maxHeapMemMax = max(maxHeapMemMax, m.getHeapMemoryMaximum()); - - totalHeapMemInit += m.getHeapMemoryInitialized(); - - totalNonHeapMemCmt += m.getNonHeapMemoryCommitted(); - - totalNonHeapMemUsed += m.getNonHeapMemoryUsed(); - - maxNonHeapMemMax = max(maxNonHeapMemMax, m.getNonHeapMemoryMaximum()); - - totalNonHeapMemInit += m.getNonHeapMemoryInitialized(); - - maxUpTime = max(maxUpTime, m.getUpTime()); - - lastDataVersion = max(lastDataVersion, m.getLastDataVersion()); - - sentMessagesCnt += m.getSentMessagesCount(); - sentBytesCnt += m.getSentBytesCount(); - receivedMessagesCnt += m.getReceivedMessagesCount(); - receivedBytesCnt += m.getReceivedBytesCount(); - outboundMessagesQueueSize += m.getOutboundMessagesQueueSize(); - } - - avgActJobs /= size; - avgCancelJobs /= size; - avgRejectJobs /= size; - avgWaitJobs /= size; - avgJobExecTime /= size; - avgJobWaitTime /= size; - avgIdleTimePercent /= size; - avgBusyTimePerc /= size; - avgCpuLoad /= size; - - if (!F.isEmpty(nodes)) { - oldestNodeStartTime = oldest(nodes).metrics().getNodeStartTime(); - } - - Map<String, Collection<ClusterNode>> neighborhood = U.neighborhood(nodes); - - currentGcCpus = gcCpus(neighborhood); - totalCpus = cpus(neighborhood); - } - - /** {@inheritDoc} */ - @Override public int getMaximumActiveJobs() { - return maxActJobs; - } - - /** {@inheritDoc} */ - @Override public float getAverageActiveJobs() { - return avgActJobs; - } - - /** {@inheritDoc} */ - @Override public int getMaximumCancelledJobs() { - return maxCancelJobs; - } - - /** {@inheritDoc} */ - @Override public float getAverageCancelledJobs() { - return avgCancelJobs; - } - - /** {@inheritDoc} */ - @Override public int getMaximumRejectedJobs() { - return maxRejectJobs; - } - - /** {@inheritDoc} */ - @Override public float getAverageRejectedJobs() { - return avgRejectJobs; - } - - /** {@inheritDoc} */ - @Override public int getMaximumWaitingJobs() { - return maxWaitJobs; - } - - /** {@inheritDoc} */ - @Override public float getAverageWaitingJobs() { - return avgWaitJobs; - } - - /** {@inheritDoc} */ - @Override public long getMaximumJobExecuteTime() { - return maxJobExecTime; - } - - /** {@inheritDoc} */ - @Override public double getAverageJobExecuteTime() { - return avgJobExecTime; - } - - /** {@inheritDoc} */ - @Override public long getMaximumJobWaitTime() { - return maxJobWaitTime; - } - - /** {@inheritDoc} */ - @Override public double getAverageJobWaitTime() { - return avgJobWaitTime; - } - - /** {@inheritDoc} */ - @Override public int getMaximumThreadCount() { - return maxThreadCnt; - } - - /** {@inheritDoc} */ - @Override public float getIdleTimePercentage() { - return avgIdleTimePercent; - } - - /** {@inheritDoc} */ - @Override public double getAverageCpuLoad() { - return avgCpuLoad; - } - - /** {@inheritDoc} */ - @Override public int getTotalCpus() { - return totalCpus; - } - - /** {@inheritDoc} */ - @Override public long getLastUpdateTime() { - return lastUpdateTime; - } - - /** {@inheritDoc} */ - @Override public int getCurrentActiveJobs() { - return curActJobs; - } - - /** {@inheritDoc} */ - @Override public int getCurrentWaitingJobs() { - return curWaitJobs; - } - - /** {@inheritDoc} */ - @Override public int getCurrentRejectedJobs() { - return curRejectJobs; - } - - /** {@inheritDoc} */ - @Override public int getTotalRejectedJobs() { - return totalRejectJobs; - } - - /** {@inheritDoc} */ - @Override public int getCurrentCancelledJobs() { - return curCancelJobs; - } - - /** {@inheritDoc} */ - @Override public int getTotalCancelledJobs() { - return totalCancelJobs; - } - - /** {@inheritDoc} */ - @Override public int getTotalExecutedJobs() { - return totalExecutedJobs; - } - - /** {@inheritDoc} */ - @Override public long getCurrentJobWaitTime() { - return minJobWaitTime; - } - - /** {@inheritDoc} */ - @Override public long getCurrentJobExecuteTime() { - return maxJobExecTime; - } - - /** {@inheritDoc} */ - @Override public int getTotalExecutedTasks() { - return totalExecutedTask; - } - - /** {@inheritDoc} */ - @Override public long getTotalBusyTime() { - return totalBusyTime; - } - - /** {@inheritDoc} */ - @Override public long getTotalIdleTime() { - return totalIdleTime; - } - - /** {@inheritDoc} */ - @Override public long getCurrentIdleTime() { - return minIdleTime; - } - - /** {@inheritDoc} */ - @Override public float getBusyTimePercentage() { - return avgBusyTimePerc; - } - - /** {@inheritDoc} */ - @Override public double getCurrentCpuLoad() { - return totalCpus; - } - - /** {@inheritDoc} */ - @Override public double getCurrentGcCpuLoad() { - return currentGcCpus; - } - - /** {@inheritDoc} */ - @Override public long getHeapMemoryInitialized() { - return totalHeapMemInit; - } - - /** {@inheritDoc} */ - @Override public long getHeapMemoryUsed() { - return totalHeapMemUsed; - } - - /** {@inheritDoc} */ - @Override public long getHeapMemoryCommitted() { - return totalHeapMemCmt; - } - - /** {@inheritDoc} */ - @Override public long getHeapMemoryMaximum() { - return maxHeapMemMax; - } - - /** {@inheritDoc} */ - @Override public long getNonHeapMemoryInitialized() { - return totalNonHeapMemInit; - } - - /** {@inheritDoc} */ - @Override public long getNonHeapMemoryUsed() { - return totalNonHeapMemUsed; - } - - /** {@inheritDoc} */ - @Override public long getNonHeapMemoryCommitted() { - return totalNonHeapMemCmt; - } - - /** {@inheritDoc} */ - @Override public long getNonHeapMemoryMaximum() { - return maxNonHeapMemMax; - } - - /** {@inheritDoc} */ - @Override public long getUpTime() { - return maxUpTime; - } - - /** {@inheritDoc} */ - @Override public long getStartTime() { - return oldestNodeStartTime; - } - - /** {@inheritDoc} */ - @Override public long getNodeStartTime() { - return oldestNodeStartTime; - } - - /** {@inheritDoc} */ - @Override public int getCurrentThreadCount() { - return curThreadCnt; - } - - /** {@inheritDoc} */ - @Override public long getTotalStartedThreadCount() { - return totalStartedThreadCnt; - } - - /** {@inheritDoc} */ - @Override public int getCurrentDaemonThreadCount() { - return curDaemonThreadCnt; - } - - /** {@inheritDoc} */ - @Override public long getLastDataVersion() { - return lastDataVersion; - } - - /** {@inheritDoc} */ - @Override public int getSentMessagesCount() { - return sentMessagesCnt; - } - - /** {@inheritDoc} */ - @Override public long getSentBytesCount() { - return sentBytesCnt; - } - - /** {@inheritDoc} */ - @Override public int getReceivedMessagesCount() { - return receivedMessagesCnt; - } - - /** {@inheritDoc} */ - @Override public long getReceivedBytesCount() { - return receivedBytesCnt; - } - - /** {@inheritDoc} */ - @Override public int getOutboundMessagesQueueSize() { - return outboundMessagesQueueSize; - } - - /** - * Gets the youngest node in given collection. - * - * @param nodes Nodes. - * @return Youngest node or {@code null} if collection is empty. - */ - @Nullable private static ClusterNode youngest(Collection<ClusterNode> nodes) { - long max = Long.MIN_VALUE; - - ClusterNode youngest = null; - - for (ClusterNode n : nodes) - if (n.order() > max) { - max = n.order(); - youngest = n; - } - - return youngest; - } - - /** - * Gets the oldest node in given collection. - * - * @param nodes Nodes. - * @return Oldest node or {@code null} if collection is empty. - */ - @Nullable private static ClusterNode oldest(Collection<ClusterNode> nodes) { - long min = Long.MAX_VALUE; - - ClusterNode oldest = null; - - for (ClusterNode n : nodes) - if (n.order() < min) { - min = n.order(); - oldest = n; - } - - return oldest; - } - - private static int cpus(Map<String, Collection<ClusterNode>> neighborhood) { - int cpus = 0; - - for (Collection<ClusterNode> nodes : neighborhood.values()) { - ClusterNode first = F.first(nodes); - - // Projection can be empty if all nodes in it failed. - if (first != null) - cpus += first.metrics().getTotalCpus(); - } - - return cpus; - } - - private static int gcCpus(Map<String, Collection<ClusterNode>> neighborhood) { - int cpus = 0; - - for (Collection<ClusterNode> nodes : neighborhood.values()) { - ClusterNode first = F.first(nodes); - - // Projection can be empty if all nodes in it failed. - if (first != null) - cpus += first.metrics().getCurrentGcCpuLoad(); - } - - return cpus; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ClusterMetricsImpl.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abe5d178/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java new file mode 100644 index 0000000..89e1037 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java @@ -0,0 +1,1454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static java.lang.Math.*; + +/** + * Implementation for {@link ClusterMetrics} interface. + * <p> + * Note that whenever adding or removing metric parameters, care + * must be taken to update serialize/deserialize logic as well. + */ +public class ClusterMetricsSnapshot implements ClusterMetrics { + /** Size of serialized node metrics. */ + public static final int METRICS_SIZE = + 4/*max active jobs*/ + + 4/*current active jobs*/ + + 4/*average active jobs*/ + + 4/*max waiting jobs*/ + + 4/*current waiting jobs*/ + + 4/*average waiting jobs*/ + + 4/*max cancelled jobs*/ + + 4/*current cancelled jobs*/ + + 4/*average cancelled jobs*/ + + 4/*max rejected jobs*/ + + 4/*current rejected jobs*/ + + 4/*average rejected jobs*/ + + 4/*total executed jobs*/ + + 4/*total rejected jobs*/ + + 4/*total cancelled jobs*/ + + 8/*max job wait time*/ + + 8/*current job wait time*/ + + 8/*average job wait time*/ + + 8/*max job execute time*/ + + 8/*current job execute time*/ + + 8/*average job execute time*/ + + 4/*total executed tasks*/ + + 8/*current idle time*/ + + 8/*total idle time*/ + + 4/*available processors*/ + + 8/*current CPU load*/ + + 8/*average CPU load*/ + + 8/*current GC CPU load*/ + + 8/*heap memory init*/ + + 8/*heap memory used*/ + + 8/*heap memory committed*/ + + 8/*heap memory max*/ + + 8/*non-heap memory init*/ + + 8/*non-heap memory used*/ + + 8/*non-heap memory committed*/ + + 8/*non-heap memory max*/ + + 8/*uptime*/ + + 8/*start time*/ + + 8/*node start time*/ + + 4/*thread count*/ + + 4/*peak thread count*/ + + 8/*total started thread count*/ + + 4/*daemon thread count*/ + + 8/*last data version.*/ + + 4/*sent messages count*/ + + 8/*sent bytes count*/ + + 4/*received messages count*/ + + 8/*received bytes count*/ + + 4/*outbound messages queue size*/; + + + /** */ + private long lastUpdateTime = -1; + + /** */ + private int maxActiveJobs = -1; + + /** */ + private int curActiveJobs = -1; + + /** */ + private float avgActiveJobs = -1; + + /** */ + private int maxWaitingJobs = -1; + + /** */ + private int curWaitingJobs = -1; + + /** */ + private float avgWaitingJobs = -1; + + /** */ + private int maxRejectedJobs = -1; + + /** */ + private int curRejectedJobs = -1; + + /** */ + private float avgRejectedJobs = -1; + + /** */ + private int maxCancelledJobs = -1; + + /** */ + private int curCancelledJobs = -1; + + /** */ + private float avgCancelledJobs = -1; + + /** */ + private int totalRejectedJobs = -1; + + /** */ + private int totalCancelledJobs = -1; + + /** */ + private int totalExecutedJobs = -1; + + /** */ + private long maxJobWaitTime = -1; + + /** */ + private long curJobWaitTime = -1; + + /** */ + private double avgJobWaitTime = -1; + + /** */ + private long maxJobExecTime = -1; + + /** */ + private long curJobExecTime = -1; + + /** */ + private double avgJobExecTime = -1; + + /** */ + private int totalExecTasks = -1; + + /** */ + private long totalIdleTime = -1; + + /** */ + private long curIdleTime = -1; + + /** */ + private int availProcs = -1; + + /** */ + private double load = -1; + + /** */ + private double avgLoad = -1; + + /** */ + private double gcLoad = -1; + + /** */ + private long heapInit = -1; + + /** */ + private long heapUsed = -1; + + /** */ + private long heapCommitted = -1; + + /** */ + private long heapMax = -1; + + /** */ + private long nonHeapInit = -1; + + /** */ + private long nonHeapUsed = -1; + + /** */ + private long nonHeapCommitted = -1; + + /** */ + private long nonHeapMax = -1; + + /** */ + private long upTime = -1; + + /** */ + private long startTime = -1; + + /** */ + private long nodeStartTime = -1; + + /** */ + private int threadCnt = -1; + + /** */ + private int peakThreadCnt = -1; + + /** */ + private long startedThreadCnt = -1; + + /** */ + private int daemonThreadCnt = -1; + + /** */ + private long lastDataVer = -1; + + /** */ + private int sentMsgsCnt = -1; + + /** */ + private long sentBytesCnt = -1; + + /** */ + private int rcvdMsgsCnt = -1; + + /** */ + private long rcvdBytesCnt = -1; + + /** */ + private int outMesQueueSize = -1; + + /** + * Create empty snapshot. + */ + public ClusterMetricsSnapshot() { + } + + /** + * Create metrics for given cluster group. + * + * @param p Projection to get metrics for. + */ + public ClusterMetricsSnapshot(ClusterGroup p) { + assert p != null; + + Collection<ClusterNode> nodes = p.nodes(); + + int size = nodes.size(); + + curJobWaitTime = Long.MAX_VALUE; + + for (ClusterNode node : nodes) { + ClusterMetrics m = node.metrics(); + + lastUpdateTime = max(lastUpdateTime, node.metrics().getLastUpdateTime()); + + curActiveJobs += m.getCurrentActiveJobs(); + maxActiveJobs = max(maxActiveJobs, m.getCurrentActiveJobs()); + avgActiveJobs += m.getCurrentActiveJobs(); + totalExecutedJobs += m.getTotalExecutedJobs(); + + totalExecTasks += m.getTotalExecutedTasks(); + + totalCancelledJobs += m.getTotalCancelledJobs(); + curCancelledJobs += m.getCurrentCancelledJobs(); + maxCancelledJobs = max(maxCancelledJobs, m.getCurrentCancelledJobs()); + avgCancelledJobs += m.getCurrentCancelledJobs(); + + totalRejectedJobs += m.getTotalRejectedJobs(); + curRejectedJobs += m.getCurrentRejectedJobs(); + maxRejectedJobs = max(maxRejectedJobs, m.getCurrentRejectedJobs()); + avgRejectedJobs += m.getCurrentRejectedJobs(); + + curWaitingJobs += m.getCurrentJobWaitTime(); + maxWaitingJobs = max(maxWaitingJobs, m.getCurrentWaitingJobs()); + avgWaitingJobs += m.getCurrentWaitingJobs(); + + maxJobExecTime = max(maxJobExecTime, m.getMaximumJobExecuteTime()); + avgJobExecTime += m.getAverageJobExecuteTime(); + curJobExecTime += m.getCurrentJobExecuteTime(); + + curJobWaitTime = min(curJobWaitTime, m.getCurrentJobWaitTime()); + maxJobWaitTime = max(maxJobWaitTime, m.getCurrentJobWaitTime()); + avgJobWaitTime += m.getCurrentJobWaitTime(); + + daemonThreadCnt += m.getCurrentDaemonThreadCount(); + + peakThreadCnt = max(peakThreadCnt, m.getCurrentThreadCount()); + threadCnt += m.getCurrentThreadCount(); + startedThreadCnt += m.getTotalStartedThreadCount(); + + curIdleTime += m.getCurrentIdleTime(); + totalIdleTime += m.getTotalIdleTime(); + + heapCommitted += m.getHeapMemoryCommitted(); + + heapUsed += m.getHeapMemoryUsed(); + + heapMax = max(heapMax, m.getHeapMemoryMaximum()); + + heapInit += m.getHeapMemoryInitialized(); + + nonHeapCommitted += m.getNonHeapMemoryCommitted(); + + nonHeapUsed += m.getNonHeapMemoryUsed(); + + nonHeapMax = max(nonHeapMax, m.getNonHeapMemoryMaximum()); + + nonHeapInit += m.getNonHeapMemoryInitialized(); + + upTime = max(upTime, m.getUpTime()); + + lastDataVer = max(lastDataVer, m.getLastDataVersion()); + + sentMsgsCnt += m.getSentMessagesCount(); + sentBytesCnt += m.getSentBytesCount(); + rcvdMsgsCnt += m.getReceivedMessagesCount(); + rcvdBytesCnt += m.getReceivedBytesCount(); + outMesQueueSize += m.getOutboundMessagesQueueSize(); + + avgLoad += m.getCurrentCpuLoad(); + availProcs += m.getTotalCpus(); + } + + curJobExecTime /= size; + + avgActiveJobs /= size; + avgCancelledJobs /= size; + avgRejectedJobs /= size; + avgWaitingJobs /= size; + avgJobExecTime /= size; + avgJobWaitTime /= size; + avgLoad /= size; + + if (!F.isEmpty(nodes)) { + ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); + + nodeStartTime = oldestNodeMetrics.getNodeStartTime(); + startTime = oldestNodeMetrics.getStartTime(); + } + + Map<String, Collection<ClusterNode>> neighborhood = U.neighborhood(nodes); + + gcLoad = gcCpus(neighborhood); + load = cpus(neighborhood); + } + + /** {@inheritDoc} */ + @Override public long getLastUpdateTime() { + return lastUpdateTime; + } + + /** + * Sets last update time. + * + * @param lastUpdateTime Last update time. + */ + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + /** {@inheritDoc} */ + @Override public int getMaximumActiveJobs() { + return maxActiveJobs; + } + + /** + * Sets max active jobs. + * + * @param maxActiveJobs Max active jobs. + */ + public void setMaximumActiveJobs(int maxActiveJobs) { + this.maxActiveJobs = maxActiveJobs; + } + + /** {@inheritDoc} */ + @Override public int getCurrentActiveJobs() { + return curActiveJobs; + } + + /** + * Sets current active jobs. + * + * @param curActiveJobs Current active jobs. + */ + public void setCurrentActiveJobs(int curActiveJobs) { + this.curActiveJobs = curActiveJobs; + } + + /** {@inheritDoc} */ + @Override public float getAverageActiveJobs() { + return avgActiveJobs; + } + + /** + * Sets average active jobs. + * + * @param avgActiveJobs Average active jobs. + */ + public void setAverageActiveJobs(float avgActiveJobs) { + this.avgActiveJobs = avgActiveJobs; + } + + /** {@inheritDoc} */ + @Override public int getMaximumWaitingJobs() { + return maxWaitingJobs; + } + + /** + * Sets maximum waiting jobs. + * + * @param maxWaitingJobs Maximum waiting jobs. + */ + public void setMaximumWaitingJobs(int maxWaitingJobs) { + this.maxWaitingJobs = maxWaitingJobs; + } + + /** {@inheritDoc} */ + @Override public int getCurrentWaitingJobs() { + return curWaitingJobs; + } + + /** + * Sets current waiting jobs. + * + * @param curWaitingJobs Current waiting jobs. + */ + public void setCurrentWaitingJobs(int curWaitingJobs) { + this.curWaitingJobs = curWaitingJobs; + } + + /** {@inheritDoc} */ + @Override public float getAverageWaitingJobs() { + return avgWaitingJobs; + } + + /** + * Sets average waiting jobs. + * + * @param avgWaitingJobs Average waiting jobs. + */ + public void setAverageWaitingJobs(float avgWaitingJobs) { + this.avgWaitingJobs = avgWaitingJobs; + } + + /** {@inheritDoc} */ + @Override public int getMaximumRejectedJobs() { + return maxRejectedJobs; + } + + /** + * @param maxRejectedJobs Maximum number of jobs rejected during a single collision resolution event. + */ + public void setMaximumRejectedJobs(int maxRejectedJobs) { + this.maxRejectedJobs = maxRejectedJobs; + } + + /** {@inheritDoc} */ + @Override public int getCurrentRejectedJobs() { + return curRejectedJobs; + } + + /** + * @param curRejectedJobs Number of jobs rejected during most recent collision resolution. + */ + public void setCurrentRejectedJobs(int curRejectedJobs) { + this.curRejectedJobs = curRejectedJobs; + } + + /** {@inheritDoc} */ + @Override public float getAverageRejectedJobs() { + return avgRejectedJobs; + } + + /** + * @param avgRejectedJobs Average number of jobs this node rejects. + */ + public void setAverageRejectedJobs(float avgRejectedJobs) { + this.avgRejectedJobs = avgRejectedJobs; + } + + /** {@inheritDoc} */ + @Override public int getTotalRejectedJobs() { + return totalRejectedJobs; + } + + /** + * @param totalRejectedJobs Total number of jobs this node ever rejected. + */ + public void setTotalRejectedJobs(int totalRejectedJobs) { + this.totalRejectedJobs = totalRejectedJobs; + } + + /** {@inheritDoc} */ + @Override public int getMaximumCancelledJobs() { + return maxCancelledJobs; + } + + /** + * Sets maximum cancelled jobs. + * + * @param maxCancelledJobs Maximum cancelled jobs. + */ + public void setMaximumCancelledJobs(int maxCancelledJobs) { + this.maxCancelledJobs = maxCancelledJobs; + } + + /** {@inheritDoc} */ + @Override public int getCurrentCancelledJobs() { + return curCancelledJobs; + } + + /** + * Sets current cancelled jobs. + * + * @param curCancelledJobs Current cancelled jobs. + */ + public void setCurrentCancelledJobs(int curCancelledJobs) { + this.curCancelledJobs = curCancelledJobs; + } + + /** {@inheritDoc} */ + @Override public float getAverageCancelledJobs() { + return avgCancelledJobs; + } + + /** + * Sets average cancelled jobs. + * + * @param avgCancelledJobs Average cancelled jobs. + */ + public void setAverageCancelledJobs(float avgCancelledJobs) { + this.avgCancelledJobs = avgCancelledJobs; + } + + /** {@inheritDoc} */ + @Override public int getTotalExecutedJobs() { + return totalExecutedJobs; + } + + /** + * Sets total active jobs. + * + * @param totalExecutedJobs Total active jobs. + */ + public void setTotalExecutedJobs(int totalExecutedJobs) { + this.totalExecutedJobs = totalExecutedJobs; + } + + /** {@inheritDoc} */ + @Override public int getTotalCancelledJobs() { + return totalCancelledJobs; + } + + /** + * Sets total cancelled jobs. + * + * @param totalCancelledJobs Total cancelled jobs. + */ + public void setTotalCancelledJobs(int totalCancelledJobs) { + this.totalCancelledJobs = totalCancelledJobs; + } + + /** {@inheritDoc} */ + @Override public long getMaximumJobWaitTime() { + return maxJobWaitTime; + } + + /** + * Sets max job wait time. + * + * @param maxJobWaitTime Max job wait time. + */ + public void setMaximumJobWaitTime(long maxJobWaitTime) { + this.maxJobWaitTime = maxJobWaitTime; + } + + /** {@inheritDoc} */ + @Override public long getCurrentJobWaitTime() { + return curJobWaitTime; + } + + /** + * Sets current job wait time. + * + * @param curJobWaitTime Current job wait time. + */ + public void setCurrentJobWaitTime(long curJobWaitTime) { + this.curJobWaitTime = curJobWaitTime; + } + + /** {@inheritDoc} */ + @Override public double getAverageJobWaitTime() { + return avgJobWaitTime; + } + + /** + * Sets average job wait time. + * + * @param avgJobWaitTime Average job wait time. + */ + public void setAverageJobWaitTime(double avgJobWaitTime) { + this.avgJobWaitTime = avgJobWaitTime; + } + + /** {@inheritDoc} */ + @Override public long getMaximumJobExecuteTime() { + return maxJobExecTime; + } + + /** + * Sets maximum job execution time. + * + * @param maxJobExecTime Maximum job execution time. + */ + public void setMaximumJobExecuteTime(long maxJobExecTime) { + this.maxJobExecTime = maxJobExecTime; + } + + /** {@inheritDoc} */ + @Override public long getCurrentJobExecuteTime() { + return curJobExecTime; + } + + /** + * Sets current job execute time. + * + * @param curJobExecTime Current job execute time. + */ + public void setCurrentJobExecuteTime(long curJobExecTime) { + this.curJobExecTime = curJobExecTime; + } + + /** {@inheritDoc} */ + @Override public double getAverageJobExecuteTime() { + return avgJobExecTime; + } + + /** + * Sets average job execution time. + * + * @param avgJobExecTime Average job execution time. + */ + public void setAverageJobExecuteTime(double avgJobExecTime) { + this.avgJobExecTime = avgJobExecTime; + } + + /** {@inheritDoc} */ + @Override public int getTotalExecutedTasks() { + return totalExecTasks; + } + + /** + * Sets total executed tasks count. + * + * @param totalExecTasks total executed tasks count. + */ + public void setTotalExecutedTasks(int totalExecTasks) { + this.totalExecTasks = totalExecTasks; + } + + /** {@inheritDoc} */ + @Override public long getTotalBusyTime() { + return getUpTime() - getTotalIdleTime(); + } + + /** {@inheritDoc} */ + @Override public long getTotalIdleTime() { + return totalIdleTime; + } + + /** + * Set total node idle time. + * + * @param totalIdleTime Total node idle time. + */ + public void setTotalIdleTime(long totalIdleTime) { + this.totalIdleTime = totalIdleTime; + } + + /** {@inheritDoc} */ + @Override public long getCurrentIdleTime() { + return curIdleTime; + } + + /** + * Sets time elapsed since execution of last job. + * + * @param curIdleTime Time elapsed since execution of last job. + */ + public void setCurrentIdleTime(long curIdleTime) { + this.curIdleTime = curIdleTime; + } + + /** {@inheritDoc} */ + @Override public float getBusyTimePercentage() { + return 1 - getIdleTimePercentage(); + } + + /** {@inheritDoc} */ + @Override public float getIdleTimePercentage() { + return getTotalIdleTime() / (float)getUpTime(); + } + + /** {@inheritDoc} */ + @Override public int getTotalCpus() { + return availProcs; + } + + /** {@inheritDoc} */ + @Override public double getCurrentCpuLoad() { + return load; + } + + /** {@inheritDoc} */ + @Override public double getAverageCpuLoad() { + return avgLoad; + } + + /** {@inheritDoc} */ + @Override public double getCurrentGcCpuLoad() { + return gcLoad; + } + + /** {@inheritDoc} */ + @Override public long getHeapMemoryInitialized() { + return heapInit; + } + + /** {@inheritDoc} */ + @Override public long getHeapMemoryUsed() { + return heapUsed; + } + + /** {@inheritDoc} */ + @Override public long getHeapMemoryCommitted() { + return heapCommitted; + } + + /** {@inheritDoc} */ + @Override public long getHeapMemoryMaximum() { + return heapMax; + } + + /** {@inheritDoc} */ + @Override public long getNonHeapMemoryInitialized() { + return nonHeapInit; + } + + /** {@inheritDoc} */ + @Override public long getNonHeapMemoryUsed() { + return nonHeapUsed; + } + + /** {@inheritDoc} */ + @Override public long getNonHeapMemoryCommitted() { + return nonHeapCommitted; + } + + /** {@inheritDoc} */ + @Override public long getNonHeapMemoryMaximum() { + return nonHeapMax; + } + + /** {@inheritDoc} */ + @Override public long getUpTime() { + return upTime; + } + + /** {@inheritDoc} */ + @Override public long getStartTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public long getNodeStartTime() { + return nodeStartTime; + } + + /** {@inheritDoc} */ + @Override public int getCurrentThreadCount() { + return threadCnt; + } + + /** {@inheritDoc} */ + @Override public int getMaximumThreadCount() { + return peakThreadCnt; + } + + /** {@inheritDoc} */ + @Override public long getTotalStartedThreadCount() { + return startedThreadCnt; + } + + /** {@inheritDoc} */ + @Override public int getCurrentDaemonThreadCount() { + return daemonThreadCnt; + } + + /** {@inheritDoc} */ + @Override public long getLastDataVersion() { + return lastDataVer; + } + + /** {@inheritDoc} */ + @Override public int getSentMessagesCount() { + return sentMsgsCnt; + } + + /** {@inheritDoc} */ + @Override public long getSentBytesCount() { + return sentBytesCnt; + } + + /** {@inheritDoc} */ + @Override public int getReceivedMessagesCount() { + return rcvdMsgsCnt; + } + + /** {@inheritDoc} */ + @Override public long getReceivedBytesCount() { + return rcvdBytesCnt; + } + + /** {@inheritDoc} */ + @Override public int getOutboundMessagesQueueSize() { + return outMesQueueSize; + } + + /** + * Sets available processors. + * + * @param availProcs Available processors. + */ + public void setAvailableProcessors(int availProcs) { + this.availProcs = availProcs; + } + + /** + * Sets current CPU load. + * + * @param load Current CPU load. + */ + public void setCurrentCpuLoad(double load) { + this.load = load; + } + + /** + * Sets CPU load average over the metrics history. + * + * @param avgLoad CPU load average. + */ + public void setAverageCpuLoad(double avgLoad) { + this.avgLoad = avgLoad; + } + + /** + * Sets current GC load. + * + * @param gcLoad Current GC load. + */ + public void setCurrentGcCpuLoad(double gcLoad) { + this.gcLoad = gcLoad; + } + + /** + * Sets heap initial memory. + * + * @param heapInit Heap initial memory. + */ + public void setHeapMemoryInitialized(long heapInit) { + this.heapInit = heapInit; + } + + /** + * Sets used heap memory. + * + * @param heapUsed Used heap memory. + */ + public void setHeapMemoryUsed(long heapUsed) { + this.heapUsed = heapUsed; + } + + /** + * Sets committed heap memory. + * + * @param heapCommitted Committed heap memory. + */ + public void setHeapMemoryCommitted(long heapCommitted) { + this.heapCommitted = heapCommitted; + } + + /** + * Sets maximum possible heap memory. + * + * @param heapMax Maximum possible heap memory. + */ + public void setHeapMemoryMaximum(long heapMax) { + this.heapMax = heapMax; + } + + /** + * Sets initial non-heap memory. + * + * @param nonHeapInit Initial non-heap memory. + */ + public void setNonHeapMemoryInitialized(long nonHeapInit) { + this.nonHeapInit = nonHeapInit; + } + + /** + * Sets used non-heap memory. + * + * @param nonHeapUsed Used non-heap memory. + */ + public void setNonHeapMemoryUsed(long nonHeapUsed) { + this.nonHeapUsed = nonHeapUsed; + } + + /** + * Sets committed non-heap memory. + * + * @param nonHeapCommitted Committed non-heap memory. + */ + public void setNonHeapMemoryCommitted(long nonHeapCommitted) { + this.nonHeapCommitted = nonHeapCommitted; + } + + /** + * Sets maximum possible non-heap memory. + * + * @param nonHeapMax Maximum possible non-heap memory. + */ + public void setNonHeapMemoryMaximum(long nonHeapMax) { + this.nonHeapMax = nonHeapMax; + } + + /** + * Sets VM up time. + * + * @param upTime VM up time. + */ + public void setUpTime(long upTime) { + this.upTime = upTime; + } + + /** + * Sets VM start time. + * + * @param startTime VM start time. + */ + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + /** + * Sets node start time. + * + * @param nodeStartTime node start time. + */ + public void setNodeStartTime(long nodeStartTime) { + this.nodeStartTime = nodeStartTime; + } + + /** + * Sets thread count. + * + * @param threadCnt Thread count. + */ + public void setCurrentThreadCount(int threadCnt) { + this.threadCnt = threadCnt; + } + + /** + * Sets peak thread count. + * + * @param peakThreadCnt Peak thread count. + */ + public void setMaximumThreadCount(int peakThreadCnt) { + this.peakThreadCnt = peakThreadCnt; + } + + /** + * Sets started thread count. + * + * @param startedThreadCnt Started thread count. + */ + public void setTotalStartedThreadCount(long startedThreadCnt) { + this.startedThreadCnt = startedThreadCnt; + } + + /** + * Sets daemon thread count. + * + * @param daemonThreadCnt Daemon thread count. + */ + public void setCurrentDaemonThreadCount(int daemonThreadCnt) { + this.daemonThreadCnt = daemonThreadCnt; + } + + /** + * Sets last data version. + * + * @param lastDataVer Last data version. + */ + public void setLastDataVersion(long lastDataVer) { + this.lastDataVer = lastDataVer; + } + + /** + * Sets sent messages count. + * + * @param sentMsgsCnt Sent messages count. + */ + public void setSentMessagesCount(int sentMsgsCnt) { + this.sentMsgsCnt = sentMsgsCnt; + } + + /** + * Sets sent bytes count. + * + * @param sentBytesCnt Sent bytes count. + */ + public void setSentBytesCount(long sentBytesCnt) { + this.sentBytesCnt = sentBytesCnt; + } + + /** + * Sets received messages count. + * + * @param rcvdMsgsCnt Received messages count. + */ + public void setReceivedMessagesCount(int rcvdMsgsCnt) { + this.rcvdMsgsCnt = rcvdMsgsCnt; + } + + /** + * Sets received bytes count. + * + * @param rcvdBytesCnt Received bytes count. + */ + public void setReceivedBytesCount(long rcvdBytesCnt) { + this.rcvdBytesCnt = rcvdBytesCnt; + } + + /** + * Sets outbound messages queue size. + * + * @param outMesQueueSize Outbound messages queue size. + */ + public void setOutboundMessagesQueueSize(int outMesQueueSize) { + this.outMesQueueSize = outMesQueueSize; + } + + private static int cpus(Map<String, Collection<ClusterNode>> neighborhood) { + int cpus = 0; + + for (Collection<ClusterNode> nodes : neighborhood.values()) { + ClusterNode first = F.first(nodes); + + // Projection can be empty if all nodes in it failed. + if (first != null) + cpus += first.metrics().getTotalCpus(); + } + + return cpus; + } + + private static int gcCpus(Map<String, Collection<ClusterNode>> neighborhood) { + int cpus = 0; + + for (Collection<ClusterNode> nodes : neighborhood.values()) { + ClusterNode first = F.first(nodes); + + // Projection can be empty if all nodes in it failed. + if (first != null) + cpus += first.metrics().getCurrentGcCpuLoad(); + } + + return cpus; + } + + /** + * Gets the oldest node in given collection. + * + * @param nodes Nodes. + * @return Oldest node or {@code null} if collection is empty. + */ + @Nullable private static ClusterNode oldest(Collection<ClusterNode> nodes) { + long min = Long.MAX_VALUE; + + ClusterNode oldest = null; + + for (ClusterNode n : nodes) + if (n.order() < min) { + min = n.order(); + oldest = n; + } + + return oldest; + } + + /** + * Serializes node metrics into byte array. + * + * @param data Byte array. + * @param off Offset into byte array. + * @param metrics Node metrics to serialize. + * @return New offset. + */ + public static int serialize(byte[] data, int off, ClusterMetrics metrics) { + int start = off; + + off = U.intToBytes(metrics.getMaximumActiveJobs(), data, off); + off = U.intToBytes(metrics.getCurrentActiveJobs(), data, off); + off = U.floatToBytes(metrics.getAverageActiveJobs(), data, off); + off = U.intToBytes(metrics.getMaximumWaitingJobs(), data, off); + off = U.intToBytes(metrics.getCurrentWaitingJobs(), data, off); + off = U.floatToBytes(metrics.getAverageWaitingJobs(), data, off); + off = U.intToBytes(metrics.getMaximumRejectedJobs(), data, off); + off = U.intToBytes(metrics.getCurrentRejectedJobs(), data, off); + off = U.floatToBytes(metrics.getAverageRejectedJobs(), data, off); + off = U.intToBytes(metrics.getMaximumCancelledJobs(), data, off); + off = U.intToBytes(metrics.getCurrentCancelledJobs(), data, off); + off = U.floatToBytes(metrics.getAverageCancelledJobs(), data, off); + off = U.intToBytes(metrics.getTotalRejectedJobs(), data , off); + off = U.intToBytes(metrics.getTotalCancelledJobs(), data , off); + off = U.intToBytes(metrics.getTotalExecutedJobs(), data , off); + off = U.longToBytes(metrics.getMaximumJobWaitTime(), data, off); + off = U.longToBytes(metrics.getCurrentJobWaitTime(), data, off); + off = U.doubleToBytes(metrics.getAverageJobWaitTime(), data, off); + off = U.longToBytes(metrics.getMaximumJobExecuteTime(), data, off); + off = U.longToBytes(metrics.getCurrentJobExecuteTime(), data, off); + off = U.doubleToBytes(metrics.getAverageJobExecuteTime(), data, off); + off = U.intToBytes(metrics.getTotalExecutedTasks(), data, off); + off = U.longToBytes(metrics.getCurrentIdleTime(), data, off); + off = U.longToBytes(metrics.getTotalIdleTime(), data , off); + off = U.intToBytes(metrics.getTotalCpus(), data, off); + off = U.doubleToBytes(metrics.getCurrentCpuLoad(), data, off); + off = U.doubleToBytes(metrics.getAverageCpuLoad(), data, off); + off = U.doubleToBytes(metrics.getCurrentGcCpuLoad(), data, off); + off = U.longToBytes(metrics.getHeapMemoryInitialized(), data, off); + off = U.longToBytes(metrics.getHeapMemoryUsed(), data, off); + off = U.longToBytes(metrics.getHeapMemoryCommitted(), data, off); + off = U.longToBytes(metrics.getHeapMemoryMaximum(), data, off); + off = U.longToBytes(metrics.getNonHeapMemoryInitialized(), data, off); + off = U.longToBytes(metrics.getNonHeapMemoryUsed(), data, off); + off = U.longToBytes(metrics.getNonHeapMemoryCommitted(), data, off); + off = U.longToBytes(metrics.getNonHeapMemoryMaximum(), data, off); + off = U.longToBytes(metrics.getStartTime(), data, off); + off = U.longToBytes(metrics.getNodeStartTime(), data, off); + off = U.longToBytes(metrics.getUpTime(), data, off); + off = U.intToBytes(metrics.getCurrentThreadCount(), data, off); + off = U.intToBytes(metrics.getMaximumThreadCount(), data, off); + off = U.longToBytes(metrics.getTotalStartedThreadCount(), data, off); + off = U.intToBytes(metrics.getCurrentDaemonThreadCount(), data, off); + off = U.longToBytes(metrics.getLastDataVersion(), data, off); + off = U.intToBytes(metrics.getSentMessagesCount(), data, off); + off = U.longToBytes(metrics.getSentBytesCount(), data, off); + off = U.intToBytes(metrics.getReceivedMessagesCount(), data, off); + off = U.longToBytes(metrics.getReceivedBytesCount(), data, off); + off = U.intToBytes(metrics.getOutboundMessagesQueueSize(), data, off); + + assert off - start == METRICS_SIZE : "Invalid metrics size [expected=" + METRICS_SIZE + ", actual=" + + (off - start) + ']'; + + return off; + } + + /** + * De-serializes node metrics. + * + * @param data Byte array. + * @param off Offset into byte array. + * @return Deserialized node metrics. + */ + public static ClusterMetrics deserialize(byte[] data, int off) { + int start = off; + + ClusterMetricsSnapshot metrics = new ClusterMetricsSnapshot(); + + metrics.setLastUpdateTime(U.currentTimeMillis()); + + metrics.setMaximumActiveJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentActiveJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setAverageActiveJobs(U.bytesToFloat(data, off)); + + off += 4; + + metrics.setMaximumWaitingJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentWaitingJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setAverageWaitingJobs(U.bytesToFloat(data, off)); + + off += 4; + + metrics.setMaximumRejectedJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentRejectedJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setAverageRejectedJobs(U.bytesToFloat(data, off)); + + off += 4; + + metrics.setMaximumCancelledJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentCancelledJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setAverageCancelledJobs(U.bytesToFloat(data, off)); + + off += 4; + + metrics.setTotalRejectedJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setTotalCancelledJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setTotalExecutedJobs(U.bytesToInt(data, off)); + + off += 4; + + metrics.setMaximumJobWaitTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setCurrentJobWaitTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setAverageJobWaitTime(U.bytesToDouble(data, off)); + + off += 8; + + metrics.setMaximumJobExecuteTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setCurrentJobExecuteTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setAverageJobExecuteTime(U.bytesToDouble(data, off)); + + off += 8; + + metrics.setTotalExecutedTasks(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentIdleTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setTotalIdleTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setAvailableProcessors(U.bytesToInt(data, off)); + + off += 4; + + metrics.setCurrentCpuLoad(U.bytesToDouble(data, off)); + + off += 8; + + metrics.setAverageCpuLoad(U.bytesToDouble(data, off)); + + off += 8; + + metrics.setCurrentGcCpuLoad(U.bytesToDouble(data, off)); + + off += 8; + + metrics.setHeapMemoryInitialized(U.bytesToLong(data, off)); + + off += 8; + + metrics.setHeapMemoryUsed(U.bytesToLong(data, off)); + + off += 8; + + metrics.setHeapMemoryCommitted(U.bytesToLong(data, off)); + + off += 8; + + metrics.setHeapMemoryMaximum(U.bytesToLong(data, off)); + + off += 8; + + metrics.setNonHeapMemoryInitialized(U.bytesToLong(data, off)); + + off += 8; + + metrics.setNonHeapMemoryUsed(U.bytesToLong(data, off)); + + off += 8; + + metrics.setNonHeapMemoryCommitted(U.bytesToLong(data, off)); + + off += 8; + + metrics.setNonHeapMemoryMaximum(U.bytesToLong(data, off)); + + off += 8; + + metrics.setStartTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setNodeStartTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setUpTime(U.bytesToLong(data, off)); + + off += 8; + + metrics.setCurrentThreadCount(U.bytesToInt(data, off)); + + off += 4; + + metrics.setMaximumThreadCount(U.bytesToInt(data, off)); + + off += 4; + + metrics.setTotalStartedThreadCount(U.bytesToLong(data, off)); + + off += 8; + + metrics.setCurrentDaemonThreadCount(U.bytesToInt(data, off)); + + off += 4; + + metrics.setLastDataVersion(U.bytesToLong(data, off)); + + off += 8; + + metrics.setSentMessagesCount(U.bytesToInt(data, off)); + + off += 4; + + metrics.setSentBytesCount(U.bytesToLong(data, off)); + + off += 8; + + metrics.setReceivedMessagesCount(U.bytesToInt(data, off)); + + off += 4; + + metrics.setReceivedBytesCount(U.bytesToLong(data, off)); + + off += 8; + + metrics.setOutboundMessagesQueueSize(U.bytesToInt(data, off)); + + off += 4; + + assert off - start == METRICS_SIZE : "Invalid metrics size [expected=" + METRICS_SIZE + ", actual=" + + (off - start) + ']'; + + return metrics; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return System.identityHashCode(this); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + ClusterMetricsSnapshot other = (ClusterMetricsSnapshot)obj; + + return + availProcs == other.availProcs && + curActiveJobs == other.curActiveJobs && + curCancelledJobs == other.curCancelledJobs && + curIdleTime == other.curIdleTime && + curJobExecTime == other.curJobExecTime && + curJobWaitTime == other.curJobWaitTime && + curRejectedJobs == other.curRejectedJobs && + curWaitingJobs == other.curWaitingJobs && + daemonThreadCnt == other.daemonThreadCnt && + heapCommitted == other.heapCommitted && + heapInit == other.heapInit && + heapMax == other.heapMax && + heapUsed == other.heapUsed && + maxActiveJobs == other.maxActiveJobs && + maxCancelledJobs == other.maxCancelledJobs && + maxJobExecTime == other.maxJobExecTime && + maxJobWaitTime == other.maxJobWaitTime && + maxRejectedJobs == other.maxRejectedJobs && + maxWaitingJobs == other.maxWaitingJobs && + nonHeapCommitted == other.nonHeapCommitted && + nonHeapInit == other.nonHeapInit && + nonHeapMax == other.nonHeapMax && + nonHeapUsed == other.nonHeapUsed && + peakThreadCnt == other.peakThreadCnt && + rcvdBytesCnt == other.rcvdBytesCnt && + outMesQueueSize == other.outMesQueueSize && + rcvdMsgsCnt == other.rcvdMsgsCnt && + sentBytesCnt == other.sentBytesCnt && + sentMsgsCnt == other.sentMsgsCnt && + startTime == other.startTime && + nodeStartTime == other.nodeStartTime && + startedThreadCnt == other.startedThreadCnt && + threadCnt == other.threadCnt && + totalCancelledJobs == other.totalCancelledJobs && + totalExecutedJobs == other.totalExecutedJobs && + totalExecTasks == other.totalExecTasks && + totalIdleTime == other.totalIdleTime && + totalRejectedJobs == other.totalRejectedJobs && + upTime == other.upTime; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterMetricsSnapshot.class, this); + } +}