This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 0e37a342dd improves update metrics (#5064) 0e37a342dd is described below commit 0e37a342ddb39d21c65fab1d0ae26c8e06286df3 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Nov 16 14:10:23 2024 -0500 improves update metrics (#5064) Made the following updates to update metrics * Added metrics for conditional mutation check and lock * Fixed counting for update metrics that documented per mutation but were not actually counting mutations * Fixed ingest metric that stated it was counting mutations but was actually counting entries. --- .../org/apache/accumulo/core/metrics/Metric.java | 14 +++++--- .../accumulo/tserver/ConditionalMutationSet.java | 5 ++- .../accumulo/tserver/TabletClientHandler.java | 37 +++++++++++++++++++--- .../tserver/metrics/TabletServerMetrics.java | 6 ++-- .../tserver/metrics/TabletServerUpdateMetrics.java | 26 ++++++++++++--- 5 files changed, 70 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index 7050b73347..8ad14a5597 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -133,8 +133,8 @@ public enum Metric { "Number of unopened tablets.", MetricCategory.TABLET_SERVER), TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE, "Number of files per tablet.", MetricCategory.TABLET_SERVER), - TSERVER_INGEST_MUTATIONS("accumulo.tserver.ingest.mutations", MetricType.GAUGE, - "Ingest mutation count. The rate can be derived from this metric.", + TSERVER_INGEST_ENTRIES("accumulo.tserver.ingest.entries", MetricType.GAUGE, + "Ingest entry (a key/value) count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), TSERVER_INGEST_BYTES("accumulo.tserver.ingest.bytes", MetricType.GAUGE, "Ingest byte count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), @@ -211,10 +211,16 @@ public enum Metric { UPDATE_ERRORS("accumulo.tserver.updates.error", MetricType.GAUGE, "Count of errors during tablet updates. Type/reason for error is stored in the `type` tag (e.g., type=permission, type=unknown.tablet, type=constraint.violation).", MetricCategory.TABLET_SERVER), + UPDATE_LOCK("accumulo.tserver.updates.lock", MetricType.TIMER, + "Average time taken for conditional mutation to get a row lock.", + MetricCategory.TABLET_SERVER), + UPDATE_CHECK("accumulo.tserver.updates.check", MetricType.TIMER, + "Average time taken for conditional mutation to check conditions.", + MetricCategory.TABLET_SERVER), UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER, - "Time taken to commit a mutation.", MetricCategory.TABLET_SERVER), + "Average time taken to commit a mutation.", MetricCategory.TABLET_SERVER), UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER, - "Time taken to prepare to commit a single mutation.", MetricCategory.TABLET_SERVER), + "Average time taken to prepare to commit a single mutation.", MetricCategory.TABLET_SERVER), UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER, "Time taken to write a batch of mutations to WAL.", MetricCategory.TABLET_SERVER), UPDATE_MUTATION_ARRAY_SIZE("accumulo.tserver.updates.mutation.arrays.size", diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java index 0704073179..614e117570 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java @@ -78,10 +78,13 @@ public class ConditionalMutationSet { defer(updates, deferred, new DuplicateFilter()); } - static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) { + static int sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) { + int numMutations = 0; for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) { entry.getValue().sort((o1, o2) -> WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length)); + numMutations += entry.getValue().size(); } + return numMutations; } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index cd303f3d06..a815af72c3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -282,6 +282,9 @@ public class TabletClientHandler implements TabletServerClientService.Iface, server.resourceManager.waitUntilCommitsAreEnabled(); } + int preppedMutations = 0; + int sendableMutations = 0; + Span span = TraceUtil.startSpan(this.getClass(), "flush::prep"); try (Scope scope = span.makeCurrent()) { for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) { @@ -291,6 +294,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability()); List<Mutation> mutations = entry.getValue(); if (!mutations.isEmpty()) { + preppedMutations += mutations.size(); try { server.updateMetrics.addMutationArraySize(mutations.size()); @@ -309,6 +313,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, loggables.put(session, new TabletMutations(session, validMutations, durability)); } sendables.put(session, validMutations); + sendableMutations += validMutations.size(); } if (!prepared.getViolations().isEmpty()) { @@ -337,7 +342,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, long pt2 = System.currentTimeMillis(); us.prepareTimes.addStat(pt2 - pt1); - updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size()); + updateAvgPrepTime(pt2 - pt1, preppedMutations); if (error != null) { sendables.forEach((commitSession, value) -> commitSession.abortCommit()); @@ -392,7 +397,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, us.flushTime += (t2 - pt1); us.commitTimes.addStat(t2 - t1); - updateAvgCommitTime(t2 - t1, sendables.size()); + updateAvgCommitTime(t2 - t1, sendableMutations); } finally { span3.end(); } @@ -407,6 +412,18 @@ public class TabletClientHandler implements TabletServerClientService.Iface, us.totalUpdates += mutationCount; } + private void updateAverageLockTime(long time, TimeUnit unit, int size) { + if (size > 0) { + server.updateMetrics.addLockTime((long) (time / (double) size), unit); + } + } + + private void updateAverageCheckTime(long time, TimeUnit unit, int size) { + if (size > 0) { + server.updateMetrics.addCheckTime((long) (time / (double) size), unit); + } + } + private void updateWalogWriteTime(long time) { server.updateMetrics.addWalogWriteTime(time); } @@ -559,6 +576,8 @@ public class TabletClientHandler implements TabletServerClientService.Iface, boolean sessionCanceled = sess.interruptFlag.get(); Span span = TraceUtil.startSpan(this.getClass(), "writeConditionalMutations::prep"); + int preppedMutions = 0; + int sendableMutations = 0; try (Scope scope = span.makeCurrent()) { long t1 = System.currentTimeMillis(); for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) { @@ -570,6 +589,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability()); List<Mutation> mutations = Collections.unmodifiableList(entry.getValue()); + preppedMutions += mutations.size(); if (!mutations.isEmpty()) { PreparedMutations prepared = tablet.prepareMutationsForCommit( @@ -587,6 +607,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, loggables.put(session, new TabletMutations(session, validMutations, durability)); } sendables.put(session, validMutations); + sendableMutations += validMutations.size(); } if (!prepared.getViolators().isEmpty()) { @@ -598,7 +619,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, } long t2 = System.currentTimeMillis(); - updateAvgPrepTime(t2 - t1, es.size()); + updateAvgPrepTime(t2 - t1, preppedMutions); } catch (Exception e) { TraceUtil.setException(span, e, true); throw e; @@ -636,7 +657,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, long t1 = System.currentTimeMillis(); sendables.forEach(CommitSession::commit); long t2 = System.currentTimeMillis(); - updateAvgCommitTime(t2 - t1, sendables.size()); + updateAvgCommitTime(t2 - t1, sendableMutations); } catch (Exception e) { TraceUtil.setException(span3, e, true); throw e; @@ -661,7 +682,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, List<String> symbols) throws IOException { // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is // more efficient and detect duplicate rows. - ConditionalMutationSet.sortConditionalMutations(updates); + int numMutations = ConditionalMutationSet.sortConditionalMutations(updates); Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>(); @@ -670,11 +691,17 @@ public class TabletClientHandler implements TabletServerClientService.Iface, ConditionalMutationSet.deferDuplicatesRows(updates, deferred); // get as many locks as possible w/o blocking... defer any rows that are locked + long lt1 = System.nanoTime(); List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred); + long lt2 = System.nanoTime(); + updateAverageLockTime(lt2 - lt1, TimeUnit.NANOSECONDS, numMutations); try { Span span = TraceUtil.startSpan(this.getClass(), "conditionalUpdate::Check conditions"); try (Scope scope = span.makeCurrent()) { + long t1 = System.nanoTime(); checkConditions(updates, results, cs, symbols); + long t2 = System.nanoTime(); + updateAverageCheckTime(t2 - t1, TimeUnit.NANOSECONDS, numMutations); } catch (Exception e) { TraceUtil.setException(span, e, true); throw e; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 68798e6d67..a70112f5f5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -24,7 +24,7 @@ import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK; import static org.apache.accumulo.core.metrics.Metric.TSERVER_ENTRIES; import static org.apache.accumulo.core.metrics.Metric.TSERVER_HOLD; import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_BYTES; -import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_MUTATIONS; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_ENTRIES; import static org.apache.accumulo.core.metrics.Metric.TSERVER_MEM_ENTRIES; import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_QUEUED; import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_RUNNING; @@ -119,8 +119,8 @@ public class TabletServerMetrics implements MetricsProducer { .description(TSERVER_TABLETS_FILES.getDescription()).register(registry); Gauge.builder(TSERVER_HOLD.getName(), util, TabletServerMetricsUtil::getHoldTime) .description(TSERVER_HOLD.getDescription()).register(registry); - Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util, TabletServerMetricsUtil::getIngestCount) - .description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry); + Gauge.builder(TSERVER_INGEST_ENTRIES.getName(), util, TabletServerMetricsUtil::getIngestCount) + .description(TSERVER_INGEST_ENTRIES.getDescription()).register(registry); Gauge.builder(TSERVER_INGEST_BYTES.getName(), util, TabletServerMetricsUtil::getIngestByteCount) .description(TSERVER_INGEST_BYTES.getDescription()).register(registry); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index 52453916cc..ccbfa30f1e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -18,13 +18,15 @@ */ package org.apache.accumulo.tserver.metrics; +import static org.apache.accumulo.core.metrics.Metric.UPDATE_CHECK; import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT; import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT_PREP; import static org.apache.accumulo.core.metrics.Metric.UPDATE_ERRORS; +import static org.apache.accumulo.core.metrics.Metric.UPDATE_LOCK; import static org.apache.accumulo.core.metrics.Metric.UPDATE_MUTATION_ARRAY_SIZE; import static org.apache.accumulo.core.metrics.Metric.UPDATE_WALOG_WRITE; -import java.time.Duration; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.metrics.MetricsProducer; @@ -43,6 +45,8 @@ public class TabletServerUpdateMetrics implements MetricsProducer { private Timer commitPrepStat = NoopMetrics.useNoopTimer(); private Timer walogWriteTimeStat = NoopMetrics.useNoopTimer(); private Timer commitTimeStat = NoopMetrics.useNoopTimer(); + private Timer checkTimeStat = NoopMetrics.useNoopTimer(); + private Timer lockTimeStat = NoopMetrics.useNoopTimer(); private DistributionSummary mutationArraySizeStat = NoopMetrics.useNoopDistributionSummary(); public void addPermissionErrors(long value) { @@ -58,21 +62,29 @@ public class TabletServerUpdateMetrics implements MetricsProducer { } public void addCommitPrep(long value) { - commitPrepStat.record(Duration.ofMillis(value)); + commitPrepStat.record(value, TimeUnit.MILLISECONDS); } public void addWalogWriteTime(long value) { - walogWriteTimeStat.record(Duration.ofMillis(value)); + walogWriteTimeStat.record(value, TimeUnit.MILLISECONDS); } public void addCommitTime(long value) { - commitTimeStat.record(Duration.ofMillis(value)); + commitTimeStat.record(value, TimeUnit.MILLISECONDS); } public void addMutationArraySize(long value) { mutationArraySizeStat.record(value); } + public void addCheckTime(long value, TimeUnit unit) { + checkTimeStat.record(value, unit); + } + + public void addLockTime(long value, TimeUnit unit) { + lockTimeStat.record(value, unit); + } + @Override public void registerMetrics(MeterRegistry registry) { FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, AtomicLong::get) @@ -91,6 +103,10 @@ public class TabletServerUpdateMetrics implements MetricsProducer { .description(UPDATE_COMMIT.getDescription()).register(registry); mutationArraySizeStat = DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName()) .description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry); - } + checkTimeStat = Timer.builder(UPDATE_CHECK.getName()).description(UPDATE_CHECK.getDescription()) + .register(registry); + lockTimeStat = Timer.builder(UPDATE_LOCK.getName()).description(UPDATE_LOCK.getDescription()) + .register(registry); + } }