This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 528711eb17 Add metric that indicates when compactor is busy (#4622)
528711eb17 is described below

commit 528711eb17d89e7d41b88658c22065f870b3c6ad
Author: Dom G <domgargu...@apache.org>
AuthorDate: Thu Jun 6 11:31:56 2024 -0400

    Add metric that indicates when compactor is busy (#4622)
    
    * Add metric that indicates when compactor is busy
    * Add metric descriptions to javadoc table
---
 .../accumulo/core/metrics/MetricsProducer.java     | 27 ++++++-
 .../org/apache/accumulo/compactor/Compactor.java   |  8 +++
 .../compaction/ExternalCompactionProgressIT.java   | 84 ++++++++++++++--------
 3 files changed, 89 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 228baa2165..3fdb1a4309 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -36,7 +36,7 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <a href="https://micrometer.io/docs/concepts#_naming_meters";>naming 
convention</a> for the
  * metrics. The table below contains a mapping of the old to new metric names.
  * <table border="1">
- * <caption>Summary of Metric Changes</caption> <!-- fate -->
+ * <caption>Summary of Metric Changes</caption>
  * <tr>
  * <th>Old Name</th>
  * <th>Hadoop Metrics2 Type</th>
@@ -44,6 +44,7 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <th>Micrometer Type</th>
  * <th>Notes</th>
  * </tr>
+ * <!-- compactor -->
  * <tr>
  * <td>N/A</td>
  * <td>N/A</td>
@@ -52,6 +53,29 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td></td>
  * </tr>
  * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_ENTRIES_READ}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of entries read by all threads performing compactions</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_ENTRIES_WRITTEN}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of entries written by all threads performing compactions</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_BUSY}</td>
+ * <td>Gauge</td>
+ * <td>Indicates if the compactor is busy or not. The value will be 0 when 
idle and 1 when
+ * busy.</td>
+ * </tr>
+ * <!-- fate -->
+ * <tr>
  * <td>currentFateOps</td>
  * <td>Gauge</td>
  * <td>{@value #METRICS_FATE_OPS}</td>
@@ -585,6 +609,7 @@ public interface MetricsProducer {
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
   String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
   String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
+  String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
 
   String METRICS_FATE_PREFIX = "accumulo.fate.";
   String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 7f7509489a..b82358581a 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -120,6 +120,7 @@ import com.beust.jcommander.Parameter;
 import com.google.common.base.Preconditions;
 
 import io.micrometer.core.instrument.FunctionCounter;
+import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.LongTaskTimer;
 import io.micrometer.core.instrument.MeterRegistry;
 
@@ -193,6 +194,13 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
         .description("Number and duration of stuck major 
compactions").register(registry);
     CompactionWatcher.setTimer(timer);
+
+    Gauge
+        .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning,
+            isRunning -> isRunning.get() ? 1 : 0)
+        .description(
+            "Indicates if the compactor is busy or not. The value will be 0 
when idle and 1 when busy.")
+        .register(registry);
   }
 
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 0cbbc3a97a..c57705807a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.compactor.Compactor;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
 public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
   private static final Logger log = 
LoggerFactory.getLogger(ExternalCompactionProgressIT.class);
   private static final int ROWS = 10_000;
+  public static final int CHECKER_THREAD_SLEEP_MS = 1_000;
 
   enum EC_PROGRESS {
     STARTED, QUARTER, HALF, THREE_QUARTERS, INVALID
@@ -121,6 +123,15 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
   public void testProgressViaMetrics() throws Exception {
     String table = this.getUniqueNames(1)[0];
 
+    final AtomicLong totalEntriesRead = new AtomicLong(0);
+    final AtomicLong totalEntriesWritten = new AtomicLong(0);
+    final AtomicInteger compactorBusy = new AtomicInteger(-1);
+    final long expectedEntriesRead = 9216;
+    final long expectedEntriesWritten = 4096;
+
+    Thread checkerThread =
+        getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, 
compactorBusy);
+
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
       createTable(client, table, "cs1");
@@ -129,12 +140,6 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
       
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
 
-      final AtomicLong expectedEntriesRead = new AtomicLong(9216);
-      final AtomicLong expectedEntriesWritten = new AtomicLong(4096);
-      final AtomicLong totalEntriesRead = new AtomicLong(0);
-      final AtomicLong totalEntriesWritten = new AtomicLong(0);
-
-      Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
       checkerThread.start();
 
       IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
@@ -142,59 +147,80 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       client.tableOperations().attachIterator(table, setting,
           EnumSet.of(IteratorUtil.IteratorScope.majc));
       log.info("Compacting table");
-      compact(client, table, 2, QUEUE1, true);
-      log.info("Done Compacting table");
-      verify(client, table, 2, ROWS);
+
+      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+          "Compactor busy metric should be false initially");
+
+      compact(client, table, 2, QUEUE1, false);
+
+      Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+          "Compactor busy metric should be true after starting compaction");
 
       Wait.waitFor(() -> {
-        if (totalEntriesRead.get() == expectedEntriesRead.get()
-            && totalEntriesWritten.get() == expectedEntriesWritten.get()) {
+        if (totalEntriesRead.get() == expectedEntriesRead
+            && totalEntriesWritten.get() == expectedEntriesWritten) {
           return true;
         }
         log.info(
             "Waiting for entries read to be {} (currently {}) and entries 
written to be {} (currently {})",
-            expectedEntriesRead.get(), totalEntriesRead.get(), 
expectedEntriesWritten.get(),
+            expectedEntriesRead, totalEntriesRead.get(), 
expectedEntriesWritten,
             totalEntriesWritten.get());
         return false;
-      }, 30000, 3000, "Entries read and written metrics values did not match 
expected values");
+      }, 30_000, CHECKER_THREAD_SLEEP_MS,
+          "Entries read and written metrics values did not match expected 
values");
 
+      log.info("Done Compacting table");
+      verify(client, table, 2, ROWS);
+
+      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+          "Compactor busy metric should be false once compaction completes");
+    } finally {
       stopCheckerThread.set(true);
       checkerThread.join();
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
     }
   }
 
   /**
-   * Get a thread that checks the metrics for entries read and written.
+   * Pulls metrics from the configured sink and updates the provided variables.
    *
    * @param totalEntriesRead this is set to the value of the entries read 
metric
    * @param totalEntriesWritten this is set to the value of the entries 
written metric
+   * @param compactorBusy this is set to the value of the compactor busy metric
    */
   private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
-      AtomicLong totalEntriesWritten) {
+      AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
     return Threads.createThread("metric-tailer", () -> {
       log.info("Starting metric tailer");
 
       sink.getLines().clear();
 
-      while (!stopCheckerThread.get()) {
+      out: while (!stopCheckerThread.get()) {
         List<String> statsDMetrics = sink.getLines();
         for (String s : statsDMetrics) {
           if (stopCheckerThread.get()) {
-            break;
+            break out;
+          }
+          TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(s);
+          if 
(!metric.getName().startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX)) {
+            continue;
           }
-          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) {
-            TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
-            int value = Integer.parseInt(e.getValue());
-            totalEntriesRead.addAndGet(value);
-            log.info("Found entries.read metric: {} with value: {}", 
e.getName(), value);
-          } else if 
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) {
-            TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
-            int value = Integer.parseInt(e.getValue());
-            totalEntriesWritten.addAndGet(value);
-            log.info("Found entries.written metric: {} with value: {}", 
e.getName(), value);
+          int value = Integer.parseInt(metric.getValue());
+          log.debug("Found metric: {} with value: {}", metric.getName(), 
value);
+          switch (metric.getName()) {
+            case MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ:
+              totalEntriesRead.addAndGet(value);
+              break;
+            case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN:
+              totalEntriesWritten.addAndGet(value);
+              break;
+            case MetricsProducer.METRICS_COMPACTOR_BUSY:
+              compactorBusy.set(value);
+              break;
           }
         }
-        sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
+        sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
       }
       log.info("Metric tailer thread finished");
     });
@@ -310,7 +336,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       try {
         while (!stopCheckerThread.get()) {
           checkRunning();
-          sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+          sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
         }
       } catch (TException e) {
         log.warn("{}", e.getMessage(), e);

Reply via email to