This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 528711eb17 Add metric that indicates when compactor is busy (#4622)
528711eb17 is described below
commit 528711eb17d89e7d41b88658c22065f870b3c6ad
Author: Dom G <[email protected]>
AuthorDate: Thu Jun 6 11:31:56 2024 -0400
Add metric that indicates when compactor is busy (#4622)
* Add metric that indicates when compactor is busy
* Add metric descriptions to javadoc table
---
.../accumulo/core/metrics/MetricsProducer.java | 27 ++++++-
.../org/apache/accumulo/compactor/Compactor.java | 8 +++
.../compaction/ExternalCompactionProgressIT.java | 84 ++++++++++++++--------
3 files changed, 89 insertions(+), 30 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 228baa2165..3fdb1a4309 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -36,7 +36,7 @@ import io.micrometer.core.instrument.MeterRegistry;
* <a href="https://micrometer.io/docs/concepts#_naming_meters">naming
convention</a> for the
* metrics. The table below contains a mapping of the old to new metric names.
* <table border="1">
- * <caption>Summary of Metric Changes</caption> <!-- fate -->
+ * <caption>Summary of Metric Changes</caption>
* <tr>
* <th>Old Name</th>
* <th>Hadoop Metrics2 Type</th>
@@ -44,6 +44,7 @@ import io.micrometer.core.instrument.MeterRegistry;
* <th>Micrometer Type</th>
* <th>Notes</th>
* </tr>
+ * <!-- compactor -->
* <tr>
* <td>N/A</td>
* <td>N/A</td>
@@ -52,6 +53,29 @@ import io.micrometer.core.instrument.MeterRegistry;
* <td></td>
* </tr>
* <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_ENTRIES_READ}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of entries read by all threads performing compactions</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_ENTRIES_WRITTEN}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of entries written by all threads performing compactions</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_BUSY}</td>
+ * <td>Gauge</td>
+ * <td>Indicates if the compactor is busy or not. The value will be 0 when
idle and 1 when
+ * busy.</td>
+ * </tr>
+ * <!-- fate -->
+ * <tr>
* <td>currentFateOps</td>
* <td>Gauge</td>
* <td>{@value #METRICS_FATE_OPS}</td>
@@ -585,6 +609,7 @@ public interface MetricsProducer {
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX +
"majc.stuck";
String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX +
"entries.read";
String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX +
"entries.written";
+ String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
String METRICS_FATE_PREFIX = "accumulo.fate.";
String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX +
"ops.in.progress.by.type";
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 7f7509489a..b82358581a 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -120,6 +120,7 @@ import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.FunctionCounter;
+import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
@@ -193,6 +194,13 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
.description("Number and duration of stuck major
compactions").register(registry);
CompactionWatcher.setTimer(timer);
+
+ Gauge
+ .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning,
+ isRunning -> isRunning.get() ? 1 : 0)
+ .description(
+ "Indicates if the compactor is busy or not. The value will be 0
when idle and 1 when busy.")
+ .register(registry);
}
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 0cbbc3a97a..c57705807a 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.compactor.Compactor;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
private static final Logger log =
LoggerFactory.getLogger(ExternalCompactionProgressIT.class);
private static final int ROWS = 10_000;
+ public static final int CHECKER_THREAD_SLEEP_MS = 1_000;
enum EC_PROGRESS {
STARTED, QUARTER, HALF, THREE_QUARTERS, INVALID
@@ -121,6 +123,15 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
public void testProgressViaMetrics() throws Exception {
String table = this.getUniqueNames(1)[0];
+ final AtomicLong totalEntriesRead = new AtomicLong(0);
+ final AtomicLong totalEntriesWritten = new AtomicLong(0);
+ final AtomicInteger compactorBusy = new AtomicInteger(-1);
+ final long expectedEntriesRead = 9216;
+ final long expectedEntriesWritten = 4096;
+
+ Thread checkerThread =
+ getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten,
compactorBusy);
+
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
createTable(client, table, "cs1");
@@ -129,12 +140,6 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
- final AtomicLong expectedEntriesRead = new AtomicLong(9216);
- final AtomicLong expectedEntriesWritten = new AtomicLong(4096);
- final AtomicLong totalEntriesRead = new AtomicLong(0);
- final AtomicLong totalEntriesWritten = new AtomicLong(0);
-
- Thread checkerThread = getMetricsCheckerThread(totalEntriesRead,
totalEntriesWritten);
checkerThread.start();
IteratorSetting setting = new IteratorSetting(50, "Slow",
SlowIterator.class);
@@ -142,59 +147,80 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
client.tableOperations().attachIterator(table, setting,
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- compact(client, table, 2, QUEUE1, true);
- log.info("Done Compacting table");
- verify(client, table, 2, ROWS);
+
+ Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
+ "Compactor busy metric should be false initially");
+
+ compact(client, table, 2, QUEUE1, false);
+
+ Wait.waitFor(() -> compactorBusy.get() == 1, 30_000,
CHECKER_THREAD_SLEEP_MS,
+ "Compactor busy metric should be true after starting compaction");
Wait.waitFor(() -> {
- if (totalEntriesRead.get() == expectedEntriesRead.get()
- && totalEntriesWritten.get() == expectedEntriesWritten.get()) {
+ if (totalEntriesRead.get() == expectedEntriesRead
+ && totalEntriesWritten.get() == expectedEntriesWritten) {
return true;
}
log.info(
"Waiting for entries read to be {} (currently {}) and entries
written to be {} (currently {})",
- expectedEntriesRead.get(), totalEntriesRead.get(),
expectedEntriesWritten.get(),
+ expectedEntriesRead, totalEntriesRead.get(),
expectedEntriesWritten,
totalEntriesWritten.get());
return false;
- }, 30000, 3000, "Entries read and written metrics values did not match
expected values");
+ }, 30_000, CHECKER_THREAD_SLEEP_MS,
+ "Entries read and written metrics values did not match expected
values");
+ log.info("Done Compacting table");
+ verify(client, table, 2, ROWS);
+
+ Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
+ "Compactor busy metric should be false once compaction completes");
+ } finally {
stopCheckerThread.set(true);
checkerThread.join();
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
}
}
/**
- * Get a thread that checks the metrics for entries read and written.
+ * Pulls metrics from the configured sink and updates the provided variables.
*
* @param totalEntriesRead this is set to the value of the entries read
metric
* @param totalEntriesWritten this is set to the value of the entries
written metric
+ * @param compactorBusy this is set to the value of the compactor busy metric
*/
private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
- AtomicLong totalEntriesWritten) {
+ AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
return Threads.createThread("metric-tailer", () -> {
log.info("Starting metric tailer");
sink.getLines().clear();
- while (!stopCheckerThread.get()) {
+ out: while (!stopCheckerThread.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (stopCheckerThread.get()) {
- break;
+ break out;
+ }
+ TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(s);
+ if
(!metric.getName().startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX)) {
+ continue;
}
- if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) {
- TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
- int value = Integer.parseInt(e.getValue());
- totalEntriesRead.addAndGet(value);
- log.info("Found entries.read metric: {} with value: {}",
e.getName(), value);
- } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) {
- TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
- int value = Integer.parseInt(e.getValue());
- totalEntriesWritten.addAndGet(value);
- log.info("Found entries.written metric: {} with value: {}",
e.getName(), value);
+ int value = Integer.parseInt(metric.getValue());
+ log.debug("Found metric: {} with value: {}", metric.getName(),
value);
+ switch (metric.getName()) {
+ case MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ:
+ totalEntriesRead.addAndGet(value);
+ break;
+ case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN:
+ totalEntriesWritten.addAndGet(value);
+ break;
+ case MetricsProducer.METRICS_COMPACTOR_BUSY:
+ compactorBusy.set(value);
+ break;
}
}
- sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
}
log.info("Metric tailer thread finished");
});
@@ -310,7 +336,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
try {
while (!stopCheckerThread.get()) {
checkRunning();
- sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+ sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
}
} catch (TException e) {
log.warn("{}", e.getMessage(), e);