This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new 02193118d4 Improvements to FateMetrics (#4924) 02193118d4 is described below commit 02193118d4bf39e43bcafd66ff8fa858bfcd0c74 Author: Dom G. <domgargu...@apache.org> AuthorDate: Wed Sep 25 15:07:42 2024 -0400 Improvements to FateMetrics (#4924) * Improvements to FateMetrics --- .../manager/metrics/fate/FateMetricValues.java | 26 +++--- .../accumulo/manager/metrics/fate/FateMetrics.java | 102 +++++++-------------- 2 files changed, 47 insertions(+), 81 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 57a561aa7b..702eaa7e56 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -19,12 +19,14 @@ package org.apache.accumulo.manager.metrics.fate; import java.util.Collections; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -44,12 +46,12 @@ class FateMetricValues { private final long zkFateChildOpsTotal; private final long zkConnectionErrors; - private final Map<String,Long> txStateCounters; + private final EnumMap<TStatus,Long> txStateCounters; private final Map<String,Long> opTypeCounters; private FateMetricValues(final long updateTime, final long currentFateOps, final long zkFateChildOpsTotal, final long zkConnectionErrors, - final Map<String,Long> txStateCounters, final Map<String,Long> opTypeCounters) { + final EnumMap<TStatus,Long> txStateCounters, final Map<String,Long> opTypeCounters) { this.updateTime = updateTime; this.currentFateOps = currentFateOps; this.zkFateChildOpsTotal = zkFateChildOpsTotal; @@ -75,7 +77,7 @@ class FateMetricValues { * * @return a map of transaction status counters. */ - Map<String,Long> getTxStateCounters() { + EnumMap<TStatus,Long> getTxStateCounters() { return txStateCounters; } @@ -115,9 +117,9 @@ class FateMetricValues { builder.withCurrentFateOps(currFates.size()); // states are enumerated - create new map with counts initialized to 0. - Map<String,Long> states = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - states.put(t.name(), 0L); + EnumMap<TStatus,Long> states = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + states.put(t, 0L); } // op types are dynamic, no count initialization needed - clearing prev values will @@ -126,7 +128,7 @@ class FateMetricValues { for (AdminUtil.TransactionStatus tx : currFates) { - String stateName = tx.getStatus().name(); + TStatus stateName = tx.getStatus(); // incr count for state states.merge(stateName, 1L, Long::sum); @@ -182,15 +184,15 @@ class FateMetricValues { private long zkFateChildOpsTotal = 0; private long zkConnectionErrors = 0; - private final Map<String,Long> txStateCounters; + private final EnumMap<TStatus,Long> txStateCounters; private Map<String,Long> opTypeCounters; Builder() { // states are enumerated - create new map with counts initialized to 0. - txStateCounters = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - txStateCounters.put(t.name(), 0L); + txStateCounters = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + txStateCounters.put(t, 0L); } opTypeCounters = Collections.emptyMap(); @@ -216,7 +218,7 @@ class FateMetricValues { return this; } - Builder withTxStateCounters(final Map<String,Long> txStateCounters) { + Builder withTxStateCounters(final EnumMap<TStatus,Long> txStateCounters) { this.txStateCounters.putAll(txStateCounters); return this; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index ebbbec4316..18e376e48c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -24,7 +24,7 @@ import static org.apache.accumulo.core.metrics.Metric.FATE_OPS_ACTIVITY; import static org.apache.accumulo.core.metrics.Metric.FATE_TX; import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS; -import java.util.List; +import java.util.EnumMap; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -41,9 +42,9 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; public class FateMetrics implements MetricsProducer { @@ -60,16 +61,10 @@ public class FateMetrics implements MetricsProducer { private final String fateRootPath; private final long refreshDelay; - private AtomicLong totalCurrentOpsGauge; - private AtomicLong totalOpsGauge; - private AtomicLong fateErrorsGauge; - private AtomicLong newTxGauge; - private AtomicLong submittedTxGauge; - private AtomicLong inProgressTxGauge; - private AtomicLong failedInProgressTxGauge; - private AtomicLong failedTxGauge; - private AtomicLong successfulTxGauge; - private AtomicLong unknownTxGauge; + private final AtomicLong totalCurrentOpsCount = new AtomicLong(0); + private final AtomicLong totalOpsCount = new AtomicLong(0); + private final AtomicLong fateErrorsCount = new AtomicLong(0); + private final EnumMap<TStatus,AtomicLong> txStatusCounters = new EnumMap<>(TStatus.class); public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { @@ -88,6 +83,10 @@ public class FateMetrics implements MetricsProducer { "FATE Metrics - Interrupt received while initializing zoo store"); } + for (TStatus status : TStatus.values()) { + txStatusCounters.put(status, new AtomicLong(0)); + } + } private void update() { @@ -95,70 +94,35 @@ public class FateMetrics implements MetricsProducer { FateMetricValues metricValues = FateMetricValues.getFromZooKeeper(context, fateRootPath, zooStore); - totalCurrentOpsGauge.set(metricValues.getCurrentFateOps()); - totalOpsGauge.set(metricValues.getZkFateChildOpsTotal()); - fateErrorsGauge.set(metricValues.getZkConnectionErrors()); - - for (Entry<String,Long> vals : metricValues.getTxStateCounters().entrySet()) { - switch (ReadOnlyTStore.TStatus.valueOf(vals.getKey())) { - case NEW: - newTxGauge.set(vals.getValue()); - break; - case SUBMITTED: - submittedTxGauge.set(vals.getValue()); - break; - case IN_PROGRESS: - inProgressTxGauge.set(vals.getValue()); - break; - case FAILED_IN_PROGRESS: - failedInProgressTxGauge.set(vals.getValue()); - break; - case FAILED: - failedTxGauge.set(vals.getValue()); - break; - case SUCCESSFUL: - successfulTxGauge.set(vals.getValue()); - break; - case UNKNOWN: - unknownTxGauge.set(vals.getValue()); - break; - default: - log.warn("Unhandled status type: {}", vals.getKey()); + totalCurrentOpsCount.set(metricValues.getCurrentFateOps()); + totalOpsCount.set(metricValues.getZkFateChildOpsTotal()); + fateErrorsCount.set(metricValues.getZkConnectionErrors()); + + for (Entry<TStatus,Long> entry : metricValues.getTxStateCounters().entrySet()) { + AtomicLong counter = txStatusCounters.get(entry.getKey()); + if (counter != null) { + counter.set(entry.getValue()); + } else { + log.warn("Unhandled TStatus: {}", entry.getKey()); } } - metricValues.getOpTypeCounters().forEach((name, count) -> { - Metrics.gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count); - }); + metricValues.getOpTypeCounters().forEach((name, count) -> Metrics + .gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count)); } @Override public void registerMetrics(final MeterRegistry registry) { - totalCurrentOpsGauge = registry.gauge(FATE_OPS.getName(), new AtomicLong(0)); - totalOpsGauge = registry.gauge(FATE_OPS_ACTIVITY.getName(), new AtomicLong(0)); - fateErrorsGauge = registry.gauge(FATE_ERRORS.getName(), - List.of(Tag.of("type", "zk.connection")), new AtomicLong(0)); - newTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.NEW.name().toLowerCase())), - new AtomicLong(0)); - submittedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())), - new AtomicLong(0)); - inProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedInProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())), - new AtomicLong(0)); - successfulTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())), - new AtomicLong(0)); - unknownTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())), - new AtomicLong(0)); + Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) + .description(FATE_OPS.getDescription()).register(registry); + Gauge.builder(FATE_OPS_ACTIVITY.getName(), totalOpsCount, AtomicLong::get) + .description(FATE_OPS_ACTIVITY.getDescription()).register(registry); + Gauge.builder(FATE_ERRORS.getName(), fateErrorsCount, AtomicLong::get) + .description(FATE_ERRORS.getDescription()).tags("type", "zk.connection").register(registry); + + txStatusCounters.forEach((status, counter) -> Gauge + .builder(FATE_TX.getName(), counter, AtomicLong::get).description(FATE_TX.getDescription()) + .tags("state", status.name().toLowerCase()).register(registry)); update();