This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new ab8e0e3b73 Add metrics for User fate transactions (#4771) ab8e0e3b73 is described below commit ab8e0e3b73af2d77e42fddefe5058e5b0ec5e55f Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat Jul 27 15:35:25 2024 -0400 Add metrics for User fate transactions (#4771) * Add fate metrics for User fate transactions This commit updates the fate metrics that are published to include the user fate transactions and not just Meta (zookeeper). The metric names are the same and a tag is now added to the metrics to indicate the fate instance type of either user or meta. This closes #4534 * update test --- .../accumulo/manager/metrics/ManagerMetrics.java | 15 +- .../manager/metrics/fate/FateMetricValues.java | 180 ++++++--------------- .../accumulo/manager/metrics/fate/FateMetrics.java | 103 ++++++------ .../metrics/fate/meta/MetaFateMetricValues.java | 144 +++++++++++++++++ .../manager/metrics/fate/meta/MetaFateMetrics.java | 81 ++++++++++ .../metrics/fate/user/UserFateMetricValues.java | 63 ++++++++ .../metrics/fate/user/UserFateMetrics.java} | 39 ++--- .../MetaFateMetricValuesTest.java} | 10 +- .../apache/accumulo/test/metrics/MetricsIT.java | 31 +++- 9 files changed, 448 insertions(+), 218 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java index 8215e2fdb1..2869e2d7e8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java @@ -30,12 +30,14 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.metrics.fate.FateMetrics; +import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics; +import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics; import io.micrometer.core.instrument.MeterRegistry; public class ManagerMetrics implements MetricsProducer { - private final FateMetrics fateMetrics; + private final List<FateMetrics<?>> fateMetrics; private final AtomicLong rootTGWErrorsGauge = new AtomicLong(0); private final AtomicLong metadataTGWErrorsGauge = new AtomicLong(0); @@ -44,8 +46,11 @@ public class ManagerMetrics implements MetricsProducer { public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) { requireNonNull(conf, "AccumuloConfiguration must not be null"); requireNonNull(conf, "Manager must not be null"); - fateMetrics = new FateMetrics(manager.getContext(), - conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)); + fateMetrics = List.of( + new MetaFateMetrics(manager.getContext(), + conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)), + new UserFateMetrics(manager.getContext(), + conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); } public void incrementTabletGroupWatcherError(DataLevel level) { @@ -66,7 +71,7 @@ public class ManagerMetrics implements MetricsProducer { @Override public void registerMetrics(MeterRegistry registry) { - fateMetrics.registerMetrics(registry); + fateMetrics.forEach(fm -> fm.registerMetrics(registry)); registry.gauge(METRICS_MANAGER_ROOT_TGW_ERRORS, rootTGWErrorsGauge); registry.gauge(METRICS_MANAGER_META_TGW_ERRORS, metadataTGWErrorsGauge); registry.gauge(METRICS_MANAGER_USER_TGW_ERRORS, userTGWErrorsGauge); @@ -75,7 +80,7 @@ public class ManagerMetrics implements MetricsProducer { public List<MetricsProducer> getProducers(AccumuloConfiguration conf, Manager manager) { ArrayList<MetricsProducer> producers = new ArrayList<>(); producers.add(this); - producers.add(fateMetrics); + producers.addAll(fateMetrics); producers.add(manager.getCompactionCoordinator()); return producers; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 9850e2c7b9..fb847afd17 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -24,11 +24,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore; -import org.apache.accumulo.server.ServerContext; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,47 +32,34 @@ import org.slf4j.LoggerFactory; * Immutable class that holds a snapshot of fate metric values - use builder to instantiate an * instance. */ -class FateMetricValues { +public abstract class FateMetricValues { private static final Logger log = LoggerFactory.getLogger(FateMetricValues.class); - private final long updateTime; - private final long currentFateOps; - private final long zkFateChildOpsTotal; - private final long zkConnectionErrors; + protected final long updateTime; + protected final long currentFateOps; - private final Map<String,Long> txStateCounters; - private final Map<String,Long> opTypeCounters; + protected final Map<String,Long> txStateCounters; + protected final Map<String,Long> opTypeCounters; - private FateMetricValues(final long updateTime, final long currentFateOps, - final long zkFateChildOpsTotal, final long zkConnectionErrors, + protected FateMetricValues(final long updateTime, final long currentFateOps, final Map<String,Long> txStateCounters, final Map<String,Long> opTypeCounters) { this.updateTime = updateTime; this.currentFateOps = currentFateOps; - this.zkFateChildOpsTotal = zkFateChildOpsTotal; - this.zkConnectionErrors = zkConnectionErrors; this.txStateCounters = txStateCounters; this.opTypeCounters = opTypeCounters; } - long getCurrentFateOps() { + public long getCurrentFateOps() { return currentFateOps; } - long getZkFateChildOpsTotal() { - return zkFateChildOpsTotal; - } - - long getZkConnectionErrors() { - return zkConnectionErrors; - } - /** * Provides counters for transaction states (NEW, IN_PROGRESS, FAILED,...). * * @return a map of transaction status counters. */ - Map<String,Long> getTxStateCounters() { + public Map<String,Long> getTxStateCounters() { return txStateCounters; } @@ -87,107 +70,62 @@ class FateMetricValues { * * @return a map of operation type counters. */ - Map<String,Long> getOpTypeCounters() { + public Map<String,Long> getOpTypeCounters() { return opTypeCounters; } - /** - * The FATE transaction stores the transaction type as a debug string in the transaction zknode. - * This method returns a map of counters of the current occurrences of each operation type that is - * IN_PROGRESS - * - * @param context Accumulo context - * @param fateRootPath the zookeeper path to fate info - * @param metaFateStore a readonly MetaFateStore - * @return the current FATE metric values. - */ - public static FateMetricValues getFromZooKeeper(final ServerContext context, - final String fateRootPath, final ReadOnlyFateStore<FateMetrics> metaFateStore) { - - FateMetricValues.Builder builder = FateMetricValues.builder(); + protected static <T extends AbstractBuilder<T,U>,U extends FateMetricValues> T + getFateMetrics(final ReadOnlyFateStore<FateMetrics<U>> fateStore, T builder) { - AdminUtil<FateMetrics> admin = new AdminUtil<>(false); + AdminUtil<FateMetrics<U>> admin = new AdminUtil<>(false); - try { + List<AdminUtil.TransactionStatus> currFates = + admin.getTransactionStatus(Map.of(fateStore.type(), fateStore), null, null, null); - List<AdminUtil.TransactionStatus> currFates = admin - .getTransactionStatus(Map.of(FateInstanceType.META, metaFateStore), null, null, null); + builder.withCurrentFateOps(currFates.size()); - builder.withCurrentFateOps(currFates.size()); - - // states are enumerated - create new map with counts initialized to 0. - Map<String,Long> states = new TreeMap<>(); - for (ReadOnlyFateStore.TStatus t : ReadOnlyFateStore.TStatus.values()) { - states.put(t.name(), 0L); - } + // states are enumerated - create new map with counts initialized to 0. + Map<String,Long> states = new TreeMap<>(); + for (ReadOnlyFateStore.TStatus t : ReadOnlyFateStore.TStatus.values()) { + states.put(t.name(), 0L); + } - // op types are dynamic, no count initialization needed - clearing prev values will - // need to be handled by the caller - this is just the counts for current op types. - Map<String,Long> opTypeCounters = new TreeMap<>(); + // op types are dynamic, no count initialization needed - clearing prev values will + // need to be handled by the caller - this is just the counts for current op types. + Map<String,Long> opTypeCounters = new TreeMap<>(); - for (AdminUtil.TransactionStatus tx : currFates) { + for (AdminUtil.TransactionStatus tx : currFates) { - String stateName = tx.getStatus().name(); + String stateName = tx.getStatus().name(); - // incr count for state - states.merge(stateName, 1L, Long::sum); + // incr count for state + states.merge(stateName, 1L, Long::sum); - // incr count for op type for for in_progress transactions. - if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) { - String opType = tx.getTxName(); - if (opType == null || opType.isEmpty()) { - opType = "UNKNOWN"; - } - opTypeCounters.merge(opType, 1L, Long::sum); + // incr count for op type for for in_progress transactions. + if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) { + String opType = tx.getTxName(); + if (opType == null || opType.isEmpty()) { + opType = "UNKNOWN"; } + opTypeCounters.merge(opType, 1L, Long::sum); } - - builder.withTxStateCounters(states); - builder.withOpTypeCounters(opTypeCounters); - - Stat node = context.getZooReaderWriter().getZooKeeper().exists(fateRootPath, false); - builder.withZkFateChildOpsTotal(node.getCversion()); - - if (log.isTraceEnabled()) { - log.trace( - "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: {}, " - + "version: {}, cversion: {}, num children: {}", - node.getCzxid(), node.getMzxid(), node.getPzxid(), node.getCtime(), node.getMtime(), - node.getVersion(), node.getCversion(), node.getNumChildren()); - } - - } catch (KeeperException ex) { - log.debug("Error connecting to ZooKeeper", ex); - builder.incrZkConnectionErrors(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); } - return builder.build(); - } - - @Override - public String toString() { - return "FateMetricValues{updateTime=" + updateTime + ", currentFateOps=" + currentFateOps - + ", zkFateChildOpsTotal=" + zkFateChildOpsTotal + ", zkConnectionErrors=" - + zkConnectionErrors + '}'; - } + builder.withTxStateCounters(states); + builder.withOpTypeCounters(opTypeCounters); - public static Builder builder() { - return new Builder(); + return builder; } - static class Builder { - - private long currentFateOps = 0; - private long zkFateChildOpsTotal = 0; - private long zkConnectionErrors = 0; + @SuppressWarnings("unchecked") + protected static abstract class AbstractBuilder<T extends AbstractBuilder<T,U>, + U extends FateMetricValues> { - private final Map<String,Long> txStateCounters; - private Map<String,Long> opTypeCounters; - - Builder() { + protected long currentFateOps = 0; + protected final Map<String,Long> txStateCounters; + protected Map<String,Long> opTypeCounters; + protected AbstractBuilder() { // states are enumerated - create new map with counts initialized to 0. txStateCounters = new TreeMap<>(); for (ReadOnlyFateStore.TStatus t : ReadOnlyFateStore.TStatus.values()) { @@ -197,39 +135,21 @@ class FateMetricValues { opTypeCounters = Collections.emptyMap(); } - Builder withCurrentFateOps(final long value) { + public T withCurrentFateOps(final long value) { this.currentFateOps = value; - return this; - } - - Builder withZkFateChildOpsTotal(final long value) { - this.zkFateChildOpsTotal = value; - return this; - } - - Builder incrZkConnectionErrors() { - this.zkConnectionErrors += 1L; - return this; - } - - Builder withZkConnectionErrors(final long value) { - this.zkConnectionErrors = value; - return this; + return (T) this; } - Builder withTxStateCounters(final Map<String,Long> txStateCounters) { + public T withTxStateCounters(final Map<String,Long> txStateCounters) { this.txStateCounters.putAll(txStateCounters); - return this; + return (T) this; } - Builder withOpTypeCounters(final Map<String,Long> opTypeCounters) { + public T withOpTypeCounters(final Map<String,Long> opTypeCounters) { this.opTypeCounters = new TreeMap<>(opTypeCounters); - return this; + return (T) this; } - FateMetricValues build() { - return new FateMetricValues(System.currentTimeMillis(), currentFateOps, zkFateChildOpsTotal, - zkConnectionErrors, txStateCounters, opTypeCounters); - } + protected abstract U build(); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index e4e5150671..c9d704e374 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -20,18 +20,16 @@ package org.apache.accumulo.manager.metrics.fate; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +38,7 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; -public class FateMetrics implements MetricsProducer { +public abstract class FateMetrics<T extends FateMetricValues> implements MetricsProducer { private static final Logger log = LoggerFactory.getLogger(FateMetrics.class); @@ -49,48 +47,35 @@ public class FateMetrics implements MetricsProducer { private static final String OP_TYPE_TAG = "op.type"; - private final ServerContext context; - private final ReadOnlyFateStore<FateMetrics> fateStore; - private final String fateRootPath; - private final long refreshDelay; - - private AtomicLong totalCurrentOpsGauge = new AtomicLong(0); - private AtomicLong totalOpsGauge = new AtomicLong(0); - private AtomicLong fateErrorsGauge = new AtomicLong(0); - private AtomicLong newTxGauge = new AtomicLong(0); - private AtomicLong submittedTxGauge = new AtomicLong(0); - private AtomicLong inProgressTxGauge = new AtomicLong(0); - private AtomicLong failedInProgressTxGauge = new AtomicLong(0); - private AtomicLong failedTxGauge = new AtomicLong(0); - private AtomicLong successfulTxGauge = new AtomicLong(0); - private AtomicLong unknownTxGauge = new AtomicLong(0); + protected final ServerContext context; + protected final ReadOnlyFateStore<FateMetrics<T>> fateStore; + protected final long refreshDelay; - public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { + protected final AtomicLong totalCurrentOpsGauge = new AtomicLong(0); + protected final AtomicLong newTxGauge = new AtomicLong(0); + protected final AtomicLong submittedTxGauge = new AtomicLong(0); + protected final AtomicLong inProgressTxGauge = new AtomicLong(0); + protected final AtomicLong failedInProgressTxGauge = new AtomicLong(0); + protected final AtomicLong failedTxGauge = new AtomicLong(0); + protected final AtomicLong successfulTxGauge = new AtomicLong(0); + protected final AtomicLong unknownTxGauge = new AtomicLong(0); + public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { this.context = context; - this.fateRootPath = context.getZooKeeperRoot() + Constants.ZFATE; this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); + this.fateStore = Objects.requireNonNull(buildStore(context)); + } - try { - this.fateStore = new MetaFateStore<>(fateRootPath, context.getZooReaderWriter()); - } catch (KeeperException ex) { - throw new IllegalStateException( - "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException( - "FATE Metrics - Interrupt received while initializing zoo store"); - } + protected abstract ReadOnlyFateStore<FateMetrics<T>> buildStore(ServerContext context); - } + protected abstract T getMetricValues(); - private void update() { - FateMetricValues metricValues = - FateMetricValues.getFromZooKeeper(context, fateRootPath, fateStore); + protected void update() { + update(getMetricValues()); + } + protected void update(T metricValues) { totalCurrentOpsGauge.set(metricValues.getCurrentFateOps()); - totalOpsGauge.set(metricValues.getZkFateChildOpsTotal()); - fateErrorsGauge.set(metricValues.getZkConnectionErrors()); for (Entry<String,Long> vals : metricValues.getTxStateCounters().entrySet()) { switch (ReadOnlyFateStore.TStatus.valueOf(vals.getKey())) { @@ -126,44 +111,56 @@ public class FateMetrics implements MetricsProducer { @Override public void registerMetrics(final MeterRegistry registry) { + var type = fateStore.type().name().toLowerCase(); + var instanceTypeTag = Tag.of("instanceType", type); + registry.gauge(METRICS_FATE_OPS, totalCurrentOpsGauge); - registry.gauge(METRICS_FATE_OPS_ACTIVITY, totalOpsGauge); - registry.gauge(METRICS_FATE_ERRORS, List.of(Tag.of("type", "zk.connection")), fateErrorsGauge); - registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.NEW.name().toLowerCase())), newTxGauge); + + registry.gauge(METRICS_FATE_TX, List + .of(Tag.of("state", ReadOnlyFateStore.TStatus.NEW.name().toLowerCase()), instanceTypeTag), + newTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.SUBMITTED.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.SUBMITTED.name().toLowerCase()), + instanceTypeTag), submittedTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.IN_PROGRESS.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.IN_PROGRESS.name().toLowerCase()), + instanceTypeTag), inProgressTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase()), + instanceTypeTag), failedInProgressTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.FAILED.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.FAILED.name().toLowerCase()), + instanceTypeTag), failedTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.SUCCESSFUL.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.SUCCESSFUL.name().toLowerCase()), + instanceTypeTag), successfulTxGauge); registry.gauge(METRICS_FATE_TX, - List.of(Tag.of("state", ReadOnlyFateStore.TStatus.UNKNOWN.name().toLowerCase())), + List.of(Tag.of("state", ReadOnlyFateStore.TStatus.UNKNOWN.name().toLowerCase()), + instanceTypeTag), unknownTxGauge); - update(); - // get fate status is read only operation - no reason to be nice on shutdown. - ScheduledExecutorService scheduler = - ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "fateMetricsPoller"); + ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() + .createScheduledExecutorService(1, type + "FateMetricsPoller"); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); + // Only update as part of the scheduler thread. + // We have to call update() in a new thread because this method to + // register metrics is called on start up in the Manager before it's finished + // initializing, so we can't scan the User fate store until after startup is done. + // If we called update() here in this method directly we would get stuck forever. ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> { try { update(); } catch (Exception ex) { - log.info("Failed to update fate metrics due to exception", ex); + log.info("Failed to update {}fate metrics due to exception", type, ex); } - }, refreshDelay, refreshDelay, TimeUnit.MILLISECONDS); + }, 0, refreshDelay, TimeUnit.MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValues.java new file mode 100644 index 0000000000..f7ca146c4b --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValues.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.metrics.fate.meta; + +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.manager.metrics.fate.FateMetricValues; +import org.apache.accumulo.manager.metrics.fate.FateMetrics; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class MetaFateMetricValues extends FateMetricValues { + + private static final Logger log = LoggerFactory.getLogger(MetaFateMetricValues.class); + + private final long zkFateChildOpsTotal; + private final long zkConnectionErrors; + + protected MetaFateMetricValues(final long updateTime, final long currentFateOps, + final long zkFateChildOpsTotal, final long zkConnectionErrors, + final Map<String,Long> txStateCounters, final Map<String,Long> opTypeCounters) { + super(updateTime, currentFateOps, txStateCounters, opTypeCounters); + + this.zkFateChildOpsTotal = zkFateChildOpsTotal; + this.zkConnectionErrors = zkConnectionErrors; + + } + + long getZkFateChildOpsTotal() { + return zkFateChildOpsTotal; + } + + long getZkConnectionErrors() { + return zkConnectionErrors; + } + + /** + * The FATE transaction stores the transaction type as a debug string in the transaction zknode. + * This method returns a map of counters of the current occurrences of each operation type that is + * IN_PROGRESS + * + * @param context Accumulo context + * @param fateRootPath the zookeeper path to fate info + * @param metaFateStore a readonly MetaFateStore + * @return the current FATE metric values. + */ + public static MetaFateMetricValues getMetaStoreMetrics(final ServerContext context, + final String fateRootPath, + final ReadOnlyFateStore<FateMetrics<MetaFateMetricValues>> metaFateStore) { + Preconditions.checkArgument(metaFateStore.type() == FateInstanceType.META, + "Fate store must be of type %s", FateInstanceType.META); + + Builder builder = null; + + try { + builder = getFateMetrics(metaFateStore, new Builder()); + + Stat node = context.getZooReaderWriter().getZooKeeper().exists(fateRootPath, false); + builder.withZkFateChildOpsTotal(node.getCversion()); + + if (log.isTraceEnabled()) { + log.trace( + "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: {}, " + + "version: {}, cversion: {}, num children: {}", + node.getCzxid(), node.getMzxid(), node.getPzxid(), node.getCtime(), node.getMtime(), + node.getVersion(), node.getCversion(), node.getNumChildren()); + } + + } catch (KeeperException ex) { + log.debug("Error connecting to ZooKeeper", ex); + Optional.ofNullable(builder).ifPresent(Builder::incrZkConnectionErrors); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + return builder.build(); + } + + @Override + public String toString() { + return "MetaFateMetricValues{updateTime=" + updateTime + ", currentFateOps=" + currentFateOps + + ", zkFateChildOpsTotal=" + zkFateChildOpsTotal + ", zkConnectionErrors=" + + zkConnectionErrors + '}'; + } + + static Builder builder() { + return new Builder(); + } + + static class Builder extends AbstractBuilder<Builder,MetaFateMetricValues> { + + private long zkFateChildOpsTotal = 0; + private long zkConnectionErrors = 0; + + Builder() { + + } + + Builder withZkFateChildOpsTotal(final long value) { + this.zkFateChildOpsTotal = value; + return this; + } + + Builder incrZkConnectionErrors() { + this.zkConnectionErrors += 1L; + return this; + } + + Builder withZkConnectionErrors(final long value) { + this.zkConnectionErrors = value; + return this; + } + + @Override + protected MetaFateMetricValues build() { + return new MetaFateMetricValues(System.currentTimeMillis(), currentFateOps, + zkFateChildOpsTotal, zkConnectionErrors, txStateCounters, opTypeCounters); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java new file mode 100644 index 0000000000..24570fa24b --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.metrics.fate.meta; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.manager.metrics.fate.FateMetrics; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +public class MetaFateMetrics extends FateMetrics<MetaFateMetricValues> { + + private final String fateRootPath; + private final AtomicLong totalOpsGauge = new AtomicLong(0); + private final AtomicLong fateErrorsGauge = new AtomicLong(0); + + public MetaFateMetrics(ServerContext context, long minimumRefreshDelay) { + super(context, minimumRefreshDelay); + this.fateRootPath = getFateRootPath(context); + } + + @Override + protected void update(MetaFateMetricValues metricValues) { + super.update(metricValues); + totalOpsGauge.set(metricValues.getZkFateChildOpsTotal()); + fateErrorsGauge.set(metricValues.getZkConnectionErrors()); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); + registry.gauge(METRICS_FATE_OPS_ACTIVITY, totalOpsGauge); + registry.gauge(METRICS_FATE_ERRORS, List.of(Tag.of("type", "zk.connection")), fateErrorsGauge); + } + + @Override + protected ReadOnlyFateStore<FateMetrics<MetaFateMetricValues>> buildStore(ServerContext context) { + try { + return new MetaFateStore<>(getFateRootPath(context), context.getZooReaderWriter()); + } catch (KeeperException ex) { + throw new IllegalStateException( + "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "FATE Metrics - Interrupt received while initializing zoo store"); + } + } + + @Override + protected MetaFateMetricValues getMetricValues() { + return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, fateStore); + } + + private static String getFateRootPath(ServerContext context) { + return context.getZooKeeperRoot() + Constants.ZFATE; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetricValues.java new file mode 100644 index 0000000000..9d6c605a18 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetricValues.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.metrics.fate.user; + +import java.util.Map; + +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.manager.metrics.fate.FateMetricValues; +import org.apache.accumulo.manager.metrics.fate.FateMetrics; + +import com.google.common.base.Preconditions; + +public class UserFateMetricValues extends FateMetricValues { + + protected UserFateMetricValues(long updateTime, long currentFateOps, + Map<String,Long> txStateCounters, Map<String,Long> opTypeCounters) { + super(updateTime, currentFateOps, txStateCounters, opTypeCounters); + } + + public static UserFateMetricValues getUserStoreMetrics( + final ReadOnlyFateStore<FateMetrics<UserFateMetricValues>> userFateStore) { + Preconditions.checkArgument(userFateStore.type() == FateInstanceType.USER, + "Fate store must be of type %s", FateInstanceType.USER); + Builder builder = getFateMetrics(userFateStore, UserFateMetricValues.builder()); + return builder.build(); + } + + @Override + public String toString() { + return "MetaFateMetricValues{updateTime=" + updateTime + ", currentFateOps=" + currentFateOps + + '}'; + } + + static Builder builder() { + return new Builder(); + } + + static class Builder extends AbstractBuilder<Builder,UserFateMetricValues> { + + @Override + protected UserFateMetricValues build() { + return new UserFateMetricValues(System.currentTimeMillis(), currentFateOps, txStateCounters, + opTypeCounters); + } + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java similarity index 50% copy from server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java copy to server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java index 66702946fb..b26206582d 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java @@ -16,35 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.metrics.fate; +package org.apache.accumulo.manager.metrics.fate.user; -import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.manager.metrics.fate.FateMetrics; +import org.apache.accumulo.server.ServerContext; -import org.junit.jupiter.api.Test; +public class UserFateMetrics extends FateMetrics<UserFateMetricValues> { -public class FateMetricValuesTest { - - @Test - public void defaultValueTest() { - - FateMetricValues v = FateMetricValues.builder().build(); - - assertEquals(0, v.getCurrentFateOps()); - assertEquals(0, v.getZkFateChildOpsTotal()); - assertEquals(0, v.getZkConnectionErrors()); + public UserFateMetrics(ServerContext context, long minimumRefreshDelay) { + super(context, minimumRefreshDelay); } - @Test - public void valueTest() { - - FateMetricValues.Builder builder = FateMetricValues.builder(); - - FateMetricValues v = - builder.withCurrentFateOps(1).withZkFateChildOpsTotal(2).withZkConnectionErrors(3).build(); - - assertEquals(1, v.getCurrentFateOps()); - assertEquals(2, v.getZkFateChildOpsTotal()); - assertEquals(3, v.getZkConnectionErrors()); + @Override + protected ReadOnlyFateStore<FateMetrics<UserFateMetricValues>> buildStore(ServerContext context) { + return new UserFateStore<>(context); + } + @Override + protected UserFateMetricValues getMetricValues() { + return UserFateMetricValues.getUserStoreMetrics(fateStore); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValuesTest.java similarity index 83% rename from server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java rename to server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValuesTest.java index 66702946fb..4c3b9e64aa 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/FateMetricValuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetricValuesTest.java @@ -16,18 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.metrics.fate; +package org.apache.accumulo.manager.metrics.fate.meta; import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; -public class FateMetricValuesTest { +public class MetaFateMetricValuesTest { @Test public void defaultValueTest() { - FateMetricValues v = FateMetricValues.builder().build(); + MetaFateMetricValues v = MetaFateMetricValues.builder().build(); assertEquals(0, v.getCurrentFateOps()); assertEquals(0, v.getZkFateChildOpsTotal()); @@ -37,9 +37,9 @@ public class FateMetricValuesTest { @Test public void valueTest() { - FateMetricValues.Builder builder = FateMetricValues.builder(); + MetaFateMetricValues.Builder builder = MetaFateMetricValues.builder(); - FateMetricValues v = + MetaFateMetricValues v = builder.withCurrentFateOps(1).withZkFateChildOpsTotal(2).withZkConnectionErrors(3).build(); assertEquals(1, v.getCurrentFateOps()); diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 499ac189a8..76f83dea3b 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -47,6 +47,8 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -266,7 +268,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) .map(TestStatsDSink::parseStatsDMetric).forEach(a -> { var t = a.getTags(); - log.trace("METRICS, received from statsd - name: '{}' num tags: {}, tags: {} = {}", + log.info("METRICS, received from statsd - name: '{}' num tags: {}, tags: {} = {}", a.getName(), t.size(), t, a.getValue()); // check hostname is always set and is valid assertNotEquals("0.0.0.0", a.getTags().get("host")); @@ -287,4 +289,31 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { }); } } + + @Test + public void fateMetrics() throws Exception { + doWorkToGenerateMetrics(); + cluster.stop(); + + List<String> statsDMetrics; + + while (!(statsDMetrics = sink.getLines()).isEmpty()) { + statsDMetrics.stream().filter(line -> line.startsWith("accumulo.fate.tx")) + .map(TestStatsDSink::parseStatsDMetric).forEach(a -> { + var t = a.getTags(); + log.debug("METRICS, received from statsd - name: '{}' num tags: {}, tags: {} = {}", + a.getName(), t.size(), t, a.getValue()); + + // Verify the fate metrics contain state and instanceType + // Checking the value would be hard to test because the metrics are updated on a timer + // and fate transactions get cleaned up when finished so the current state is a bit + // non-deterministic + TStatus status = TStatus.valueOf(a.getTags().get("state").toUpperCase()); + assertNotNull(status); + FateInstanceType type = + FateInstanceType.valueOf(a.getTags().get("instanceType").toUpperCase()); + assertNotNull(type); + }); + } + } }