This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 41d382638a Fix MetricsIT and compaction queue metrics (#4069)
41d382638a is described below

commit 41d382638a2e8b776145ac831408e49c27d576f7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Dec 14 15:33:22 2023 -0500

    Fix MetricsIT and compaction queue metrics (#4069)
    
    Retain references to the Gauges that are created
    for each compaction queue so that they can be
    removed when the queue is removed. Added
    new constant in MetricsProducer for compaction
    queues that is used in MetricsIT.  Refactored
    MetricsIT test to ensure that compaction queue
    metrics are seen before stopping MAC.
---
 .../accumulo/core/metrics/MetricsProducer.java     |  14 +--
 .../compaction/queue/CompactionJobQueues.java      |   4 +
 .../accumulo/manager/metrics/QueueMetrics.java     | 131 ++++++++++++++-------
 .../apache/accumulo/test/metrics/MetricsIT.java    |  65 +++++++++-
 4 files changed, 162 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 8c785267f2..1dd7fcfd99 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -660,17 +660,17 @@ public interface MetricsProducer {
   String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
   String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
-  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_PREFIX + 
"queue.count";
-  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_PREFIX + "queue.length";
+  String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue.";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = 
METRICS_COMPACTOR_QUEUE_PREFIX + "count";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_QUEUE_PREFIX + "length";
   String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED =
-      METRICS_COMPACTOR_PREFIX + "queue.jobs.dequeued";
+      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued";
   String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED =
-      METRICS_COMPACTOR_PREFIX + "queue.jobs.queued";
+      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued";
   String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED =
-      METRICS_COMPACTOR_PREFIX + "queue.jobs.rejected";
-
+      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected";
   String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY =
-      METRICS_COMPACTOR_PREFIX + "queue.jobs.priority";
+      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority";
 
   String METRICS_FATE_PREFIX = "accumulo.fate.";
   String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 0ff572cb8a..f6090effd2 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -62,6 +62,10 @@ public class CompactionJobQueues {
     return priorityQueues.keySet();
   }
 
+  public CompactionJobPriorityQueue getQueue(CompactionExecutorId executorId) {
+    return priorityQueues.get(executorId);
+  }
+
   public long getQueueMaxSize(CompactionExecutorId executorId) {
     var prioQ = priorityQueues.get(executorId);
     return prioQ == null ? 0 : prioQ.getMaxSize();
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
index d73fefc64f..87925ceb10 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
@@ -18,27 +18,89 @@
  */
 package org.apache.accumulo.manager.metrics;
 
-import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
 import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
 
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Tags;
 
 public class QueueMetrics implements MetricsProducer {
+
+  private static class QueueMeters {
+    private final Gauge length;
+    private final Gauge jobsQueued;
+    private final Gauge jobsDequeued;
+    private final Gauge jobsRejected;
+    private final Gauge jobsLowestPriority;
+
+    public QueueMeters(MeterRegistry meterRegistry, CompactionExecutorId 
queueId,
+        CompactionJobPriorityQueue queue) {
+      length =
+          Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, queue, q 
-> q.getMaxSize())
+              .description("Length of priority queues")
+              .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId.canonical()))
+              .register(meterRegistry);
+
+      jobsQueued = Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, queue, q 
-> q.getQueuedJobs())
+          .description("Count of queued jobs")
+          .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
+          .register(meterRegistry);
+
+      jobsDequeued = Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, queue,
+              q -> q.getDequeuedJobs())
+          .description("Count of jobs dequeued")
+          .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
+          .register(meterRegistry);
+
+      jobsRejected = Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, queue,
+              q -> q.getRejectedJobs())
+          .description("Count of rejected jobs")
+          .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
+          .register(meterRegistry);
+
+      jobsLowestPriority = Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, queue,
+              q -> q.getLowestPriority())
+          .description("Lowest priority queued job")
+          .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
+          .register(meterRegistry);
+    }
+
+    private void removeMeters(MeterRegistry registry) {
+      registry.remove(length);
+      registry.remove(jobsQueued);
+      registry.remove(jobsDequeued);
+      registry.remove(jobsRejected);
+      registry.remove(jobsLowestPriority);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueueMetrics.class);
   private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(5);
   private MeterRegistry meterRegistry = null;
   private final CompactionJobQueues compactionJobQueues;
-  private AtomicLong queueCount;
+  private final Map<CompactionExecutorId,QueueMeters> perQueueMetrics = new 
HashMap<>();
+  private Gauge queueCountMeter = null;
 
   public QueueMetrics(CompactionJobQueues compactionJobQueues) {
     this.compactionJobQueues = compactionJobQueues;
@@ -51,46 +113,35 @@ public class QueueMetrics implements MetricsProducer {
 
   public void update() {
 
-    Gauge
-        .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
-            CompactionJobQueues::getQueueCount)
-        .description("Number of current 
Queues").tags(getCommonTags()).register(meterRegistry);
-
-    for (CompactionExecutorId ceid : compactionJobQueues.getQueueIds()) {
-      // Normalize the queueId to match metrics tag naming convention.
-      String queueId = formatString(ceid.toString());
-
-      // Register queues by ID rather than by object as queues can be deleted.
-      Gauge
-          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid,
-              compactionJobQueues::getQueueMaxSize)
-          .description("Length of priority queues")
-          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
-
-      Gauge
-          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, ceid,
-              compactionJobQueues::getQueuedJobs)
-          .description("Count of queued jobs")
-          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+    if (queueCountMeter == null) {
+      queueCountMeter = Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
+              CompactionJobQueues::getQueueCount)
+          .description("Number of current 
Queues").tags(getCommonTags()).register(meterRegistry);
+    }
+    LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds());
 
-      Gauge
-          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, ceid,
-              compactionJobQueues::getDequeuedJobs)
-          .description("Count of jobs dequeued")
-          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+    Set<CompactionExecutorId> definedQueues = 
compactionJobQueues.getQueueIds();
+    LOG.debug("update - defined queues: {}", definedQueues);
 
-      Gauge
-          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, ceid,
-              compactionJobQueues::getRejectedJobs)
-          .description("Count of rejected jobs")
-          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+    Set<CompactionExecutorId> queuesWithMetrics = perQueueMetrics.keySet();
+    LOG.debug("update - queues with metrics: {}", queuesWithMetrics);
+
+    SetView<CompactionExecutorId> queuesWithoutMetrics =
+        Sets.difference(definedQueues, queuesWithMetrics);
+    queuesWithoutMetrics.forEach(q -> {
+      LOG.debug("update - creating meters for queue: {}", q);
+      perQueueMetrics.put(q, new QueueMeters(meterRegistry, q, 
compactionJobQueues.getQueue(q)));
+    });
+
+    SetView<CompactionExecutorId> metricsWithoutQueues =
+        Sets.difference(queuesWithMetrics, definedQueues);
+    metricsWithoutQueues.forEach(q -> {
+      LOG.debug("update - removing meters for queue: {}", q);
+      perQueueMetrics.get(q).removeMeters(meterRegistry);
+      perQueueMetrics.remove(q);
+    });
 
-      Gauge
-          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, ceid,
-              compactionJobQueues::getLowestPriority)
-          .description("Lowest priority queued job")
-          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
-    }
   }
 
   @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index a9d351dc52..b68159877b 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -21,22 +21,26 @@ package org.apache.accumulo.test.metrics;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.time.Duration;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.conf.Property;
@@ -45,6 +49,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -60,7 +65,7 @@ public class MetricsIT extends ConfigurableMacBase implements 
MetricsProducer {
 
   @Override
   protected Duration defaultTimeout() {
-    return Duration.ofMinutes(1);
+    return Duration.ofMinutes(3);
   }
 
   @BeforeAll
@@ -92,9 +97,6 @@ public class MetricsIT extends ConfigurableMacBase implements 
MetricsProducer {
   @Test
   public void confirmMetricsPublished() throws Exception {
 
-    doWorkToGenerateMetrics();
-    cluster.stop();
-
     Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, 
METRICS_UPDATE_ERRORS,
         METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM, 
METRICS_SCAN_RETURN_FOR_MEM,
         METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED);
@@ -114,8 +116,30 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
 
     List<String> statsDMetrics;
 
+    final int compactionPriorityQueueLengthBit = 0;
+    final int compactionPriorityQueueQueuedBit = 1;
+    final int compactionPriorityQueueDequeuedBit = 2;
+    final int compactionPriorityQueueRejectedBit = 3;
+    final int compactionPriorityQueuePriorityBit = 4;
+
+    final BitSet trueSet = new BitSet(5);
+    trueSet.set(0, 4, true);
+
+    final BitSet queueMetricsSeen = new BitSet(5);
+
+    AtomicReference<Exception> error = new AtomicReference<>();
+    Thread workerThread = new Thread(() -> {
+      try {
+        doWorkToGenerateMetrics();
+      } catch (Exception e) {
+        error.set(e);
+      }
+    });
+    workerThread.start();
+
     // loop until we run out of lines or until we see all expected metrics
-    while (!(statsDMetrics = sink.getLines()).isEmpty() && 
!expectedMetricNames.isEmpty()) {
+    while (!(statsDMetrics = sink.getLines()).isEmpty() && 
!expectedMetricNames.isEmpty()
+        && !queueMetricsSeen.intersects(trueSet)) {
       // for each metric name not yet seen, check if it is expected, flaky, or 
unknown
       statsDMetrics.stream().filter(line -> line.startsWith("accumulo"))
           .map(TestStatsDSink::parseStatsDMetric).map(Metric::getName)
@@ -126,14 +150,37 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
             } else if (flakyMetrics.contains(name)) {
               // ignore any flaky metric names seen
               // these aren't always expected, but we shouldn't be surprised 
if we see them
+            } else if (name.startsWith(METRICS_COMPACTOR_PREFIX)) {
+              // Compactor queue metrics are not guaranteed to be emitted
+              // during the call to doWorkToGenerateMetrics above. This will
+              // flip a bit in the BitSet when each metric is seen. The 
top-level
+              // loop will continue to iterate until all the metrics are seen.
+              seenMetricNames.put(name, expectedMetricNames.remove(name));
+              if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.equals(name)) {
+                queueMetricsSeen.set(compactionPriorityQueueLengthBit, true);
+              } else if 
(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.equals(name)) {
+                queueMetricsSeen.set(compactionPriorityQueueQueuedBit, true);
+              } else if 
(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.equals(name)) {
+                queueMetricsSeen.set(compactionPriorityQueueDequeuedBit, true);
+              } else if 
(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.equals(name)) {
+                queueMetricsSeen.set(compactionPriorityQueueRejectedBit, true);
+              } else if 
(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.equals(name)) {
+                queueMetricsSeen.set(compactionPriorityQueuePriorityBit, true);
+              }
             } else {
               // completely unexpected metric
               fail("Found accumulo metric not in expectedMetricNames or 
flakyMetricNames: " + name);
             }
           });
+      Thread.sleep(4_000);
     }
     assertTrue(expectedMetricNames.isEmpty(),
         "Did not see all expected metric names, missing: " + 
expectedMetricNames.values());
+
+    workerThread.join();
+    assertNull(error.get());
+    cluster.stop();
+
   }
 
   private void doWorkToGenerateMetrics() throws Exception {
@@ -167,6 +214,14 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
       try (Scanner scanner = client.createScanner(tableName)) {
         scanner.forEach((k, v) -> {});
       }
+      // Start a compaction with the slow iterator to ensure that the 
compaction queues
+      // are not removed quickly
+      CompactionConfig cc = new CompactionConfig();
+      IteratorSetting is = new IteratorSetting(100, "slow", 
SlowIterator.class);
+      SlowIterator.setSleepTime(is, 3000);
+      cc.setIterators(List.of(is));
+      cc.setWait(false);
+      client.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
       client.tableOperations().delete(tableName);
       while (client.tableOperations().exists(tableName)) {
         Thread.sleep(1000);

Reply via email to