This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0e37a342dd improves update metrics (#5064)
0e37a342dd is described below
commit 0e37a342ddb39d21c65fab1d0ae26c8e06286df3
Author: Keith Turner <[email protected]>
AuthorDate: Sat Nov 16 14:10:23 2024 -0500
improves update metrics (#5064)
Made the following updates to update metrics
* Added metrics for conditional mutation check and lock
* Fixed counting for update metrics that documented per mutation but
were not actually counting mutations
* Fixed ingest metric that stated it was counting mutations but was
actually counting entries.
---
.../org/apache/accumulo/core/metrics/Metric.java | 14 +++++---
.../accumulo/tserver/ConditionalMutationSet.java | 5 ++-
.../accumulo/tserver/TabletClientHandler.java | 37 +++++++++++++++++++---
.../tserver/metrics/TabletServerMetrics.java | 6 ++--
.../tserver/metrics/TabletServerUpdateMetrics.java | 26 ++++++++++++---
5 files changed, 70 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 7050b73347..8ad14a5597 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -133,8 +133,8 @@ public enum Metric {
"Number of unopened tablets.", MetricCategory.TABLET_SERVER),
TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE,
"Number of files per tablet.", MetricCategory.TABLET_SERVER),
- TSERVER_INGEST_MUTATIONS("accumulo.tserver.ingest.mutations",
MetricType.GAUGE,
- "Ingest mutation count. The rate can be derived from this metric.",
+ TSERVER_INGEST_ENTRIES("accumulo.tserver.ingest.entries", MetricType.GAUGE,
+ "Ingest entry (a key/value) count. The rate can be derived from this
metric.",
MetricCategory.TABLET_SERVER),
TSERVER_INGEST_BYTES("accumulo.tserver.ingest.bytes", MetricType.GAUGE,
"Ingest byte count. The rate can be derived from this metric.",
MetricCategory.TABLET_SERVER),
@@ -211,10 +211,16 @@ public enum Metric {
UPDATE_ERRORS("accumulo.tserver.updates.error", MetricType.GAUGE,
"Count of errors during tablet updates. Type/reason for error is stored
in the `type` tag (e.g., type=permission, type=unknown.tablet,
type=constraint.violation).",
MetricCategory.TABLET_SERVER),
+ UPDATE_LOCK("accumulo.tserver.updates.lock", MetricType.TIMER,
+ "Average time taken for conditional mutation to get a row lock.",
+ MetricCategory.TABLET_SERVER),
+ UPDATE_CHECK("accumulo.tserver.updates.check", MetricType.TIMER,
+ "Average time taken for conditional mutation to check conditions.",
+ MetricCategory.TABLET_SERVER),
UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER,
- "Time taken to commit a mutation.", MetricCategory.TABLET_SERVER),
+ "Average time taken to commit a mutation.",
MetricCategory.TABLET_SERVER),
UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER,
- "Time taken to prepare to commit a single mutation.",
MetricCategory.TABLET_SERVER),
+ "Average time taken to prepare to commit a single mutation.",
MetricCategory.TABLET_SERVER),
UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER,
"Time taken to write a batch of mutations to WAL.",
MetricCategory.TABLET_SERVER),
UPDATE_MUTATION_ARRAY_SIZE("accumulo.tserver.updates.mutation.arrays.size",
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
index 0704073179..614e117570 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionalMutationSet.java
@@ -78,10 +78,13 @@ public class ConditionalMutationSet {
defer(updates, deferred, new DuplicateFilter());
}
- static void
sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>>
updates) {
+ static int
sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>>
updates) {
+ int numMutations = 0;
for (Entry<KeyExtent,List<ServerConditionalMutation>> entry :
updates.entrySet()) {
entry.getValue().sort((o1, o2) ->
WritableComparator.compareBytes(o1.getRow(), 0,
o1.getRow().length, o2.getRow(), 0, o2.getRow().length));
+ numMutations += entry.getValue().size();
}
+ return numMutations;
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index cd303f3d06..a815af72c3 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -282,6 +282,9 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
server.resourceManager.waitUntilCommitsAreEnabled();
}
+ int preppedMutations = 0;
+ int sendableMutations = 0;
+
Span span = TraceUtil.startSpan(this.getClass(), "flush::prep");
try (Scope scope = span.makeCurrent()) {
for (Entry<Tablet,? extends List<Mutation>> entry :
us.queuedMutations.entrySet()) {
@@ -291,6 +294,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
DurabilityImpl.resolveDurabilty(us.durability,
tablet.getDurability());
List<Mutation> mutations = entry.getValue();
if (!mutations.isEmpty()) {
+ preppedMutations += mutations.size();
try {
server.updateMetrics.addMutationArraySize(mutations.size());
@@ -309,6 +313,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
loggables.put(session, new TabletMutations(session,
validMutations, durability));
}
sendables.put(session, validMutations);
+ sendableMutations += validMutations.size();
}
if (!prepared.getViolations().isEmpty()) {
@@ -337,7 +342,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
long pt2 = System.currentTimeMillis();
us.prepareTimes.addStat(pt2 - pt1);
- updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
+ updateAvgPrepTime(pt2 - pt1, preppedMutations);
if (error != null) {
sendables.forEach((commitSession, value) -> commitSession.abortCommit());
@@ -392,7 +397,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
- updateAvgCommitTime(t2 - t1, sendables.size());
+ updateAvgCommitTime(t2 - t1, sendableMutations);
} finally {
span3.end();
}
@@ -407,6 +412,18 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
us.totalUpdates += mutationCount;
}
+ private void updateAverageLockTime(long time, TimeUnit unit, int size) {
+ if (size > 0) {
+ server.updateMetrics.addLockTime((long) (time / (double) size), unit);
+ }
+ }
+
+ private void updateAverageCheckTime(long time, TimeUnit unit, int size) {
+ if (size > 0) {
+ server.updateMetrics.addCheckTime((long) (time / (double) size), unit);
+ }
+ }
+
private void updateWalogWriteTime(long time) {
server.updateMetrics.addWalogWriteTime(time);
}
@@ -559,6 +576,8 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
boolean sessionCanceled = sess.interruptFlag.get();
Span span = TraceUtil.startSpan(this.getClass(),
"writeConditionalMutations::prep");
+ int preppedMutions = 0;
+ int sendableMutations = 0;
try (Scope scope = span.makeCurrent()) {
long t1 = System.currentTimeMillis();
for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
@@ -570,6 +589,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
DurabilityImpl.resolveDurabilty(sess.durability,
tablet.getDurability());
List<Mutation> mutations =
Collections.unmodifiableList(entry.getValue());
+ preppedMutions += mutations.size();
if (!mutations.isEmpty()) {
PreparedMutations prepared = tablet.prepareMutationsForCommit(
@@ -587,6 +607,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
loggables.put(session, new TabletMutations(session,
validMutations, durability));
}
sendables.put(session, validMutations);
+ sendableMutations += validMutations.size();
}
if (!prepared.getViolators().isEmpty()) {
@@ -598,7 +619,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
}
long t2 = System.currentTimeMillis();
- updateAvgPrepTime(t2 - t1, es.size());
+ updateAvgPrepTime(t2 - t1, preppedMutions);
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
@@ -636,7 +657,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
long t1 = System.currentTimeMillis();
sendables.forEach(CommitSession::commit);
long t2 = System.currentTimeMillis();
- updateAvgCommitTime(t2 - t1, sendables.size());
+ updateAvgCommitTime(t2 - t1, sendableMutations);
} catch (Exception e) {
TraceUtil.setException(span3, e, true);
throw e;
@@ -661,7 +682,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
List<String> symbols) throws IOException {
// sort each list of mutations, this is done to avoid deadlock and doing
seeks in order is
// more efficient and detect duplicate rows.
- ConditionalMutationSet.sortConditionalMutations(updates);
+ int numMutations =
ConditionalMutationSet.sortConditionalMutations(updates);
Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
@@ -670,11 +691,17 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
// get as many locks as possible w/o blocking... defer any rows that are
locked
+ long lt1 = System.nanoTime();
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+ long lt2 = System.nanoTime();
+ updateAverageLockTime(lt2 - lt1, TimeUnit.NANOSECONDS, numMutations);
try {
Span span = TraceUtil.startSpan(this.getClass(),
"conditionalUpdate::Check conditions");
try (Scope scope = span.makeCurrent()) {
+ long t1 = System.nanoTime();
checkConditions(updates, results, cs, symbols);
+ long t2 = System.nanoTime();
+ updateAverageCheckTime(t2 - t1, TimeUnit.NANOSECONDS, numMutations);
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
index 68798e6d67..a70112f5f5 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
@@ -24,7 +24,7 @@ import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_ENTRIES;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_HOLD;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_BYTES;
-import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_MUTATIONS;
+import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_ENTRIES;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_MEM_ENTRIES;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_QUEUED;
import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_RUNNING;
@@ -119,8 +119,8 @@ public class TabletServerMetrics implements MetricsProducer
{
.description(TSERVER_TABLETS_FILES.getDescription()).register(registry);
Gauge.builder(TSERVER_HOLD.getName(), util,
TabletServerMetricsUtil::getHoldTime)
.description(TSERVER_HOLD.getDescription()).register(registry);
- Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util,
TabletServerMetricsUtil::getIngestCount)
-
.description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry);
+ Gauge.builder(TSERVER_INGEST_ENTRIES.getName(), util,
TabletServerMetricsUtil::getIngestCount)
+
.description(TSERVER_INGEST_ENTRIES.getDescription()).register(registry);
Gauge.builder(TSERVER_INGEST_BYTES.getName(), util,
TabletServerMetricsUtil::getIngestByteCount)
.description(TSERVER_INGEST_BYTES.getDescription()).register(registry);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
index 52453916cc..ccbfa30f1e 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
@@ -18,13 +18,15 @@
*/
package org.apache.accumulo.tserver.metrics;
+import static org.apache.accumulo.core.metrics.Metric.UPDATE_CHECK;
import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT;
import static org.apache.accumulo.core.metrics.Metric.UPDATE_COMMIT_PREP;
import static org.apache.accumulo.core.metrics.Metric.UPDATE_ERRORS;
+import static org.apache.accumulo.core.metrics.Metric.UPDATE_LOCK;
import static
org.apache.accumulo.core.metrics.Metric.UPDATE_MUTATION_ARRAY_SIZE;
import static org.apache.accumulo.core.metrics.Metric.UPDATE_WALOG_WRITE;
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.metrics.MetricsProducer;
@@ -43,6 +45,8 @@ public class TabletServerUpdateMetrics implements
MetricsProducer {
private Timer commitPrepStat = NoopMetrics.useNoopTimer();
private Timer walogWriteTimeStat = NoopMetrics.useNoopTimer();
private Timer commitTimeStat = NoopMetrics.useNoopTimer();
+ private Timer checkTimeStat = NoopMetrics.useNoopTimer();
+ private Timer lockTimeStat = NoopMetrics.useNoopTimer();
private DistributionSummary mutationArraySizeStat =
NoopMetrics.useNoopDistributionSummary();
public void addPermissionErrors(long value) {
@@ -58,21 +62,29 @@ public class TabletServerUpdateMetrics implements
MetricsProducer {
}
public void addCommitPrep(long value) {
- commitPrepStat.record(Duration.ofMillis(value));
+ commitPrepStat.record(value, TimeUnit.MILLISECONDS);
}
public void addWalogWriteTime(long value) {
- walogWriteTimeStat.record(Duration.ofMillis(value));
+ walogWriteTimeStat.record(value, TimeUnit.MILLISECONDS);
}
public void addCommitTime(long value) {
- commitTimeStat.record(Duration.ofMillis(value));
+ commitTimeStat.record(value, TimeUnit.MILLISECONDS);
}
public void addMutationArraySize(long value) {
mutationArraySizeStat.record(value);
}
+ public void addCheckTime(long value, TimeUnit unit) {
+ checkTimeStat.record(value, unit);
+ }
+
+ public void addLockTime(long value, TimeUnit unit) {
+ lockTimeStat.record(value, unit);
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount,
AtomicLong::get)
@@ -91,6 +103,10 @@ public class TabletServerUpdateMetrics implements
MetricsProducer {
.description(UPDATE_COMMIT.getDescription()).register(registry);
mutationArraySizeStat =
DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName())
.description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry);
- }
+ checkTimeStat =
Timer.builder(UPDATE_CHECK.getName()).description(UPDATE_CHECK.getDescription())
+ .register(registry);
+ lockTimeStat =
Timer.builder(UPDATE_LOCK.getName()).description(UPDATE_LOCK.getDescription())
+ .register(registry);
+ }
}