This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e37a342dd improves update metrics (#5064)
0e37a342dd is described below

commit 0e37a342ddb39d21c65fab1d0ae26c8e06286df3
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Nov 16 14:10:23 2024 -0500

    improves update metrics (#5064)
    
    Made the following updates to update metrics
    
     * Added metrics for conditional mutation check and lock
     * Fixed counting for update metrics that documented per mutation but
       were not actually counting mutations
     * Fixed ingest metric that stated it was counting mutations but was
       actually counting entries.
---
 .../org/apache/accumulo/core/metrics/Metric.java   | 14 +++++---
 .../accumulo/tserver/ConditionalMutationSet.java   |  5 ++-
 .../accumulo/tserver/TabletClientHandler.java      | 37 +++++++++++++++++++---
 .../tserver/metrics/TabletServerMetrics.java       |  6 ++--
 .../tserver/metrics/TabletServerUpdateMetrics.java | 26 ++++++++++++---
 5 files changed, 70 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 7050b73347..8ad14a5597 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -133,8 +133,8 @@ public enum Metric {
       "Number of unopened tablets.", MetricCategory.TABLET_SERVER),
   TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE,
       "Number of files per tablet.", MetricCategory.TABLET_SERVER),
-  TSERVER_INGEST_MUTATIONS("accumulo.tserver.ingest.mutations", 
MetricType.GAUGE,
-      "Ingest mutation count. The rate can be derived from this metric.",
+  TSERVER_INGEST_ENTRIES("accumulo.tserver.ingest.entries", MetricType.GAUGE,
+      "Ingest entry (a key/value) count. The rate can be derived from this 
metric.",
       MetricCategory.TABLET_SERVER),
   TSERVER_INGEST_BYTES("accumulo.tserver.ingest.bytes", MetricType.GAUGE,
       "Ingest byte count. The rate can be derived from this metric.", 
MetricCategory.TABLET_SERVER),
@@ -211,10 +211,16 @@ public enum Metric {
   UPDATE_ERRORS("accumulo.tserver.updates.error", MetricType.GAUGE,
       "Count of errors during tablet updates. Type/reason for error is stored 
in the `type` tag (e.g., type=permission, type=unknown.tablet, 
type=constraint.violation).",
       MetricCategory.TABLET_SERVER),
+  UPDATE_LOCK("accumulo.tserver.updates.lock", MetricType.TIMER,
+      "Average time taken for conditional mutation to get a row lock.",
+      MetricCategory.TABLET_SERVER),
+  UPDATE_CHECK("accumulo.tserver.updates.check", MetricType.TIMER,
+      "Average time taken for conditional mutation to check conditions.",
+      MetricCategory.TABLET_SERVER),
   UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER,
-      "Time taken to commit a mutation.", MetricCategory.TABLET_SERVER),
+      "Average time taken to commit a mutation.", 
MetricCategory.TABLET_SERVER),
   UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER,
-      "Time taken to prepare to commit a single mutation.", 
MetricCategory.TABLET_SERVER),
+      "Average time taken to prepare to commit a single mutation.", 
MetricCategory.TABLET_SERVER),
   UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER,
       "Time taken to write a batch of mutations to WAL.", 
MetricCategory.TABLET_SERVER),
   UPDATE_MUTATION_ARRAY_SIZE("accumulo.tserver.updates.mutation.arrays.size",
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
index 0704073179..614e117570 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
@@ -78,10 +78,13 @@ public class ConditionalMutationSet {
     defer(updates, deferred, new DuplicateFilter());
   }
 
-  static void 
sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> 
updates) {
+  static int 
sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> 
updates) {
+    int numMutations = 0;
     for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : 
updates.entrySet()) {
       entry.getValue().sort((o1, o2) -> 
WritableComparator.compareBytes(o1.getRow(), 0,
           o1.getRow().length, o2.getRow(), 0, o2.getRow().length));
+      numMutations += entry.getValue().size();
     }
+    return numMutations;
   }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index cd303f3d06..a815af72c3 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -282,6 +282,9 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       server.resourceManager.waitUntilCommitsAreEnabled();
     }
 
+    int preppedMutations = 0;
+    int sendableMutations = 0;
+
     Span span = TraceUtil.startSpan(this.getClass(), "flush::prep");
     try (Scope scope = span.makeCurrent()) {
       for (Entry<Tablet,? extends List<Mutation>> entry : 
us.queuedMutations.entrySet()) {
@@ -291,6 +294,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
             DurabilityImpl.resolveDurabilty(us.durability, 
tablet.getDurability());
         List<Mutation> mutations = entry.getValue();
         if (!mutations.isEmpty()) {
+          preppedMutations += mutations.size();
           try {
             server.updateMetrics.addMutationArraySize(mutations.size());
 
@@ -309,6 +313,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
                   loggables.put(session, new TabletMutations(session, 
validMutations, durability));
                 }
                 sendables.put(session, validMutations);
+                sendableMutations += validMutations.size();
               }
 
               if (!prepared.getViolations().isEmpty()) {
@@ -337,7 +342,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
 
     long pt2 = System.currentTimeMillis();
     us.prepareTimes.addStat(pt2 - pt1);
-    updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
+    updateAvgPrepTime(pt2 - pt1, preppedMutations);
 
     if (error != null) {
       sendables.forEach((commitSession, value) -> commitSession.abortCommit());
@@ -392,7 +397,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
         us.flushTime += (t2 - pt1);
         us.commitTimes.addStat(t2 - t1);
 
-        updateAvgCommitTime(t2 - t1, sendables.size());
+        updateAvgCommitTime(t2 - t1, sendableMutations);
       } finally {
         span3.end();
       }
@@ -407,6 +412,18 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
     us.totalUpdates += mutationCount;
   }
 
+  private void updateAverageLockTime(long time, TimeUnit unit, int size) {
+    if (size > 0) {
+      server.updateMetrics.addLockTime((long) (time / (double) size), unit);
+    }
+  }
+
+  private void updateAverageCheckTime(long time, TimeUnit unit, int size) {
+    if (size > 0) {
+      server.updateMetrics.addCheckTime((long) (time / (double) size), unit);
+    }
+  }
+
   private void updateWalogWriteTime(long time) {
     server.updateMetrics.addWalogWriteTime(time);
   }
@@ -559,6 +576,8 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
     boolean sessionCanceled = sess.interruptFlag.get();
 
     Span span = TraceUtil.startSpan(this.getClass(), 
"writeConditionalMutations::prep");
+    int preppedMutions = 0;
+    int sendableMutations = 0;
     try (Scope scope = span.makeCurrent()) {
       long t1 = System.currentTimeMillis();
       for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
@@ -570,6 +589,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
               DurabilityImpl.resolveDurabilty(sess.durability, 
tablet.getDurability());
 
           List<Mutation> mutations = 
Collections.unmodifiableList(entry.getValue());
+          preppedMutions += mutations.size();
           if (!mutations.isEmpty()) {
 
             PreparedMutations prepared = tablet.prepareMutationsForCommit(
@@ -587,6 +607,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
                   loggables.put(session, new TabletMutations(session, 
validMutations, durability));
                 }
                 sendables.put(session, validMutations);
+                sendableMutations += validMutations.size();
               }
 
               if (!prepared.getViolators().isEmpty()) {
@@ -598,7 +619,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       }
 
       long t2 = System.currentTimeMillis();
-      updateAvgPrepTime(t2 - t1, es.size());
+      updateAvgPrepTime(t2 - t1, preppedMutions);
     } catch (Exception e) {
       TraceUtil.setException(span, e, true);
       throw e;
@@ -636,7 +657,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       long t1 = System.currentTimeMillis();
       sendables.forEach(CommitSession::commit);
       long t2 = System.currentTimeMillis();
-      updateAvgCommitTime(t2 - t1, sendables.size());
+      updateAvgCommitTime(t2 - t1, sendableMutations);
     } catch (Exception e) {
       TraceUtil.setException(span3, e, true);
       throw e;
@@ -661,7 +682,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       List<String> symbols) throws IOException {
     // sort each list of mutations, this is done to avoid deadlock and doing 
seeks in order is
     // more efficient and detect duplicate rows.
-    ConditionalMutationSet.sortConditionalMutations(updates);
+    int numMutations = 
ConditionalMutationSet.sortConditionalMutations(updates);
 
     Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
 
@@ -670,11 +691,17 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
     ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
 
     // get as many locks as possible w/o blocking... defer any rows that are 
locked
+    long lt1 = System.nanoTime();
     List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+    long lt2 = System.nanoTime();
+    updateAverageLockTime(lt2 - lt1, TimeUnit.NANOSECONDS, numMutations);
     try {
       Span span = TraceUtil.startSpan(this.getClass(), 
"conditionalUpdate::Check conditions");
       try (Scope scope = span.makeCurrent()) {
+        long t1 = System.nanoTime();
         checkConditions(updates, results, cs, symbols);
+        long t2 = System.nanoTime();
+        updateAverageCheckTime(t2 - t1, TimeUnit.NANOSECONDS, numMutations);
       } catch (Exception e) {
         TraceUtil.setException(span, e, true);
         throw e;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
index 68798e6d67..a70112f5f5 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
@@ -24,7 +24,7 @@ import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_ENTRIES;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_HOLD;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_BYTES;
-import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_MUTATIONS;
+import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_ENTRIES;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_MEM_ENTRIES;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_QUEUED;
 import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_RUNNING;
@@ -119,8 +119,8 @@ public class TabletServerMetrics implements MetricsProducer 
{
         
.description(TSERVER_TABLETS_FILES.getDescription()).register(registry);
     Gauge.builder(TSERVER_HOLD.getName(), util, 
TabletServerMetricsUtil::getHoldTime)
         .description(TSERVER_HOLD.getDescription()).register(registry);
-    Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util, 
TabletServerMetricsUtil::getIngestCount)
-        
.description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry);
+    Gauge.builder(TSERVER_INGEST_ENTRIES.getName(), util, 
TabletServerMetricsUtil::getIngestCount)
+        
.description(TSERVER_INGEST_ENTRIES.getDescription()).register(registry);
     Gauge.builder(TSERVER_INGEST_BYTES.getName(), util, 
TabletServerMetricsUtil::getIngestByteCount)
         .description(TSERVER_INGEST_BYTES.getDescription()).register(registry);
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
index 52453916cc..ccbfa30f1e 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
@@ -18,13 +18,15 @@
  */
 package org.apache.accumulo.tserver.metrics;
 
+import static org.apache.accumulo.core.metrics.Metric.UPDATE_CHECK;
 import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT;
 import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT_PREP;
 import static org.apache.accumulo.core.metrics.Metric.UPDATE_ERRORS;
+import static org.apache.accumulo.core.metrics.Metric.UPDATE_LOCK;
 import static 
org.apache.accumulo.core.metrics.Metric.UPDATE_MUTATION_ARRAY_SIZE;
 import static org.apache.accumulo.core.metrics.Metric.UPDATE_WALOG_WRITE;
 
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.metrics.MetricsProducer;
@@ -43,6 +45,8 @@ public class TabletServerUpdateMetrics implements 
MetricsProducer {
   private Timer commitPrepStat = NoopMetrics.useNoopTimer();
   private Timer walogWriteTimeStat = NoopMetrics.useNoopTimer();
   private Timer commitTimeStat = NoopMetrics.useNoopTimer();
+  private Timer checkTimeStat = NoopMetrics.useNoopTimer();
+  private Timer lockTimeStat = NoopMetrics.useNoopTimer();
   private DistributionSummary mutationArraySizeStat = 
NoopMetrics.useNoopDistributionSummary();
 
   public void addPermissionErrors(long value) {
@@ -58,21 +62,29 @@ public class TabletServerUpdateMetrics implements 
MetricsProducer {
   }
 
   public void addCommitPrep(long value) {
-    commitPrepStat.record(Duration.ofMillis(value));
+    commitPrepStat.record(value, TimeUnit.MILLISECONDS);
   }
 
   public void addWalogWriteTime(long value) {
-    walogWriteTimeStat.record(Duration.ofMillis(value));
+    walogWriteTimeStat.record(value, TimeUnit.MILLISECONDS);
   }
 
   public void addCommitTime(long value) {
-    commitTimeStat.record(Duration.ofMillis(value));
+    commitTimeStat.record(value, TimeUnit.MILLISECONDS);
   }
 
   public void addMutationArraySize(long value) {
     mutationArraySizeStat.record(value);
   }
 
+  public void addCheckTime(long value, TimeUnit unit) {
+    checkTimeStat.record(value, unit);
+  }
+
+  public void addLockTime(long value, TimeUnit unit) {
+    lockTimeStat.record(value, unit);
+  }
+
   @Override
   public void registerMetrics(MeterRegistry registry) {
     FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, 
AtomicLong::get)
@@ -91,6 +103,10 @@ public class TabletServerUpdateMetrics implements 
MetricsProducer {
         .description(UPDATE_COMMIT.getDescription()).register(registry);
     mutationArraySizeStat = 
DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName())
         
.description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry);
-  }
+    checkTimeStat = 
Timer.builder(UPDATE_CHECK.getName()).description(UPDATE_CHECK.getDescription())
+        .register(registry);
+    lockTimeStat = 
Timer.builder(UPDATE_LOCK.getName()).description(UPDATE_LOCK.getDescription())
+        .register(registry);
 
+  }
 }

Reply via email to