This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 528711eb17 Add metric that indicates when compactor is busy (#4622) 528711eb17 is described below commit 528711eb17d89e7d41b88658c22065f870b3c6ad Author: Dom G <domgargu...@apache.org> AuthorDate: Thu Jun 6 11:31:56 2024 -0400 Add metric that indicates when compactor is busy (#4622) * Add metric that indicates when compactor is busy * Add metric descriptions to javadoc table --- .../accumulo/core/metrics/MetricsProducer.java | 27 ++++++- .../org/apache/accumulo/compactor/Compactor.java | 8 +++ .../compaction/ExternalCompactionProgressIT.java | 84 ++++++++++++++-------- 3 files changed, 89 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 228baa2165..3fdb1a4309 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -36,7 +36,7 @@ import io.micrometer.core.instrument.MeterRegistry; * <a href="https://micrometer.io/docs/concepts#_naming_meters">naming convention</a> for the * metrics. The table below contains a mapping of the old to new metric names. * <table border="1"> - * <caption>Summary of Metric Changes</caption> <!-- fate --> + * <caption>Summary of Metric Changes</caption> * <tr> * <th>Old Name</th> * <th>Hadoop Metrics2 Type</th> @@ -44,6 +44,7 @@ import io.micrometer.core.instrument.MeterRegistry; * <th>Micrometer Type</th> * <th>Notes</th> * </tr> + * <!-- compactor --> * <tr> * <td>N/A</td> * <td>N/A</td> @@ -52,6 +53,29 @@ import io.micrometer.core.instrument.MeterRegistry; * <td></td> * </tr> * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_ENTRIES_READ}</td> + * <td>FunctionCounter</td> + * <td>Number of entries read by all threads performing compactions</td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_ENTRIES_WRITTEN}</td> + * <td>FunctionCounter</td> + * <td>Number of entries written by all threads performing compactions</td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_BUSY}</td> + * <td>Gauge</td> + * <td>Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when + * busy.</td> + * </tr> + * <!-- fate --> + * <tr> * <td>currentFateOps</td> * <td>Gauge</td> * <td>{@value #METRICS_FATE_OPS}</td> @@ -585,6 +609,7 @@ public interface MetricsProducer { String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; + String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 7f7509489a..b82358581a 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -120,6 +120,7 @@ import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@ -193,6 +194,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); + + Gauge + .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning, + isRunning -> isRunning.get() ? 1 : 0) + .description( + "Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when busy.") + .register(registry); } protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 0cbbc3a97a..c57705807a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.compactor.Compactor; @@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory; public class ExternalCompactionProgressIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(ExternalCompactionProgressIT.class); private static final int ROWS = 10_000; + public static final int CHECKER_THREAD_SLEEP_MS = 1_000; enum EC_PROGRESS { STARTED, QUARTER, HALF, THREE_QUARTERS, INVALID @@ -121,6 +123,15 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { public void testProgressViaMetrics() throws Exception { String table = this.getUniqueNames(1)[0]; + final AtomicLong totalEntriesRead = new AtomicLong(0); + final AtomicLong totalEntriesWritten = new AtomicLong(0); + final AtomicInteger compactorBusy = new AtomicInteger(-1); + final long expectedEntriesRead = 9216; + final long expectedEntriesWritten = 4096; + + Thread checkerThread = + getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, compactorBusy); + try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { createTable(client, table, "cs1"); @@ -129,12 +140,6 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); - final AtomicLong expectedEntriesRead = new AtomicLong(9216); - final AtomicLong expectedEntriesWritten = new AtomicLong(4096); - final AtomicLong totalEntriesRead = new AtomicLong(0); - final AtomicLong totalEntriesWritten = new AtomicLong(0); - - Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); checkerThread.start(); IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); @@ -142,59 +147,80 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { client.tableOperations().attachIterator(table, setting, EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - compact(client, table, 2, QUEUE1, true); - log.info("Done Compacting table"); - verify(client, table, 2, ROWS); + + Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be false initially"); + + compact(client, table, 2, QUEUE1, false); + + Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be true after starting compaction"); Wait.waitFor(() -> { - if (totalEntriesRead.get() == expectedEntriesRead.get() - && totalEntriesWritten.get() == expectedEntriesWritten.get()) { + if (totalEntriesRead.get() == expectedEntriesRead + && totalEntriesWritten.get() == expectedEntriesWritten) { return true; } log.info( "Waiting for entries read to be {} (currently {}) and entries written to be {} (currently {})", - expectedEntriesRead.get(), totalEntriesRead.get(), expectedEntriesWritten.get(), + expectedEntriesRead, totalEntriesRead.get(), expectedEntriesWritten, totalEntriesWritten.get()); return false; - }, 30000, 3000, "Entries read and written metrics values did not match expected values"); + }, 30_000, CHECKER_THREAD_SLEEP_MS, + "Entries read and written metrics values did not match expected values"); + log.info("Done Compacting table"); + verify(client, table, 2, ROWS); + + Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be false once compaction completes"); + } finally { stopCheckerThread.set(true); checkerThread.join(); + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); } } /** - * Get a thread that checks the metrics for entries read and written. + * Pulls metrics from the configured sink and updates the provided variables. * * @param totalEntriesRead this is set to the value of the entries read metric * @param totalEntriesWritten this is set to the value of the entries written metric + * @param compactorBusy this is set to the value of the compactor busy metric */ private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, - AtomicLong totalEntriesWritten) { + AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) { return Threads.createThread("metric-tailer", () -> { log.info("Starting metric tailer"); sink.getLines().clear(); - while (!stopCheckerThread.get()) { + out: while (!stopCheckerThread.get()) { List<String> statsDMetrics = sink.getLines(); for (String s : statsDMetrics) { if (stopCheckerThread.get()) { - break; + break out; + } + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(s); + if (!metric.getName().startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX)) { + continue; } - if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) { - TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); - int value = Integer.parseInt(e.getValue()); - totalEntriesRead.addAndGet(value); - log.info("Found entries.read metric: {} with value: {}", e.getName(), value); - } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) { - TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); - int value = Integer.parseInt(e.getValue()); - totalEntriesWritten.addAndGet(value); - log.info("Found entries.written metric: {} with value: {}", e.getName(), value); + int value = Integer.parseInt(metric.getValue()); + log.debug("Found metric: {} with value: {}", metric.getName(), value); + switch (metric.getName()) { + case MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ: + totalEntriesRead.addAndGet(value); + break; + case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN: + totalEntriesWritten.addAndGet(value); + break; + case MetricsProducer.METRICS_COMPACTOR_BUSY: + compactorBusy.set(value); + break; } } - sleepUninterruptibly(3000, TimeUnit.MILLISECONDS); + sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS); } log.info("Metric tailer thread finished"); }); @@ -310,7 +336,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { try { while (!stopCheckerThread.get()) { checkRunning(); - sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS); } } catch (TException e) { log.warn("{}", e.getMessage(), e);