This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new ab5c57eb3c Add metrics to track time compaction jobs are queued (#4980)
ab5c57eb3c is described below

commit ab5c57eb3c25d202133fa8e1e818115e7855accc
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Fri Oct 18 16:39:47 2024 -0400

    Add metrics to track time compaction jobs are queued (#4980)
    
    This adds 2 new groups of stats to track information about queued
    compaction jobs. The first stat is a timer that keeps track of when jobs
    are being polled and give information on how often/fast jobs are
    exiting the queue. The second group of stats is a min/max/avg
    and is tracking age information about how long jobs are waiting
    on the queue.
    
    This closes #4945
---
 .../org/apache/accumulo/core/metrics/Metric.java   |  12 +++
 .../coordinator/CompactionCoordinator.java         |   2 +-
 .../compaction/coordinator/QueueMetrics.java       |  36 +++++++
 .../queue/CompactionJobPriorityQueue.java          | 106 +++++++++++++++++++--
 .../queue/CompactionJobPriorityQueueTest.java      |  40 +++++++-
 .../compaction/queue/CompactionJobQueuesTest.java  |   8 ++
 .../compaction/ExternalCompactionMetricsIT.java    |  28 +++++-
 7 files changed, 219 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 92690123c6..7050b73347 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -51,6 +51,18 @@ public enum Metric {
       MetricType.GAUGE, "Count of rejected jobs.", MetricCategory.COMPACTOR),
   
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compactor.queue.jobs.priority",
       MetricType.GAUGE, "Lowest priority queued job.", 
MetricCategory.COMPACTOR),
+  
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE("accumulo.compactor.queue.jobs.min.age",
+      MetricType.GAUGE, "Minimum age of currently queued jobs in seconds.",
+      MetricCategory.COMPACTOR),
+  
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE("accumulo.compactor.queue.jobs.max.age",
+      MetricType.GAUGE, "Maximum age of currently queued jobs in seconds.",
+      MetricCategory.COMPACTOR),
+  
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE("accumulo.compactor.queue.jobs.avg.age",
+      MetricType.GAUGE, "Average age of currently queued jobs in seconds.",
+      MetricCategory.COMPACTOR),
+  
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER("accumulo.compactor.queue.jobs.exit.time",
+      MetricType.TIMER, "Tracks time a job spent in the queue before exiting 
the queue.",
+      MetricCategory.COMPACTOR),
 
   // Fate Metrics
   FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", 
MetricType.GAUGE,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 272395354d..3de05a7578 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -1049,7 +1049,7 @@ public class CompactionCoordinator
           // associated priority queue of jobs
           CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
           if (queue != null) {
-            queue.clear();
+            queue.clearIfInactive(Duration.ofMinutes(10));
             queue.setMaxSize(this.jobQueueInitialSize);
           }
         } else {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index eb2b9800c2..dd2705eb18 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@ -19,7 +19,11 @@
 package org.apache.accumulo.manager.compaction.coordinator;
 
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUES;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
@@ -48,6 +52,7 @@ import com.google.common.collect.Sets.SetView;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
 
 public class QueueMetrics implements MetricsProducer {
 
@@ -57,6 +62,10 @@ public class QueueMetrics implements MetricsProducer {
     private final Gauge jobsDequeued;
     private final Gauge jobsRejected;
     private final Gauge jobsLowestPriority;
+    private final Gauge jobsMinAge;
+    private final Gauge jobsMaxAge;
+    private final Gauge jobsAvgAge;
+    private final Timer jobsQueueTimer;
 
     public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid,
         CompactionJobPriorityQueue queue) {
@@ -90,6 +99,29 @@ public class QueueMetrics implements MetricsProducer {
               q -> q.getLowestPriority())
           
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getDescription())
           .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+      jobsMinAge = Gauge
+          .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName(), queue,
+              q -> q.getJobQueueStats().getMinAge().toSeconds())
+          
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getDescription())
+          .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+      jobsMaxAge = Gauge
+          .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName(), queue,
+              q -> q.getJobQueueStats().getMaxAge().toSeconds())
+          
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getDescription())
+          .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+      jobsAvgAge = 
Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName(), queue,
+          // Divide by 1000.0 instead of using toSeconds() so we get a double
+          q -> q.getJobQueueStats().getAvgAge().toMillis() / 1000.0)
+          
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getDescription())
+          .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+      jobsQueueTimer = 
Timer.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName())
+          
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getDescription())
+          .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+      queue.setJobQueueTimerCallback(jobsQueueTimer);
     }
 
     private void removeMeters(MeterRegistry registry) {
@@ -98,6 +130,10 @@ public class QueueMetrics implements MetricsProducer {
       registry.remove(jobsDequeued);
       registry.remove(jobsRejected);
       registry.remove(jobsLowestPriority);
+      registry.remove(jobsMinAge);
+      registry.remove(jobsMaxAge);
+      registry.remove(jobsAvgAge);
+      registry.remove(jobsQueueTimer);
     }
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 9e2ddbc96a..2099dbed6d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -28,23 +29,31 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 
 /**
  * Priority Queue for {@link CompactionJob}s that supports a maximum size. 
When a job is added and
@@ -113,6 +122,9 @@ public class CompactionJobPriorityQueue {
   private final AtomicLong dequeuedJobs;
   private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> 
futures;
   private long futuresAdded = 0;
+  private final Map<KeyExtent,Timer> jobAges;
+  private final Supplier<CompactionJobPriorityQueueStats> jobQueueStats;
+  private final AtomicReference<Optional<io.micrometer.core.instrument.Timer>> 
jobQueueTimer;
 
   private static class TabletJobs {
     final long generation;
@@ -138,6 +150,10 @@ public class CompactionJobPriorityQueue {
     this.rejectedJobs = new AtomicLong(0);
     this.dequeuedJobs = new AtomicLong(0);
     this.futures = new ArrayDeque<>();
+    this.jobAges = new ConcurrentHashMap<>();
+    this.jobQueueStats = Suppliers.memoizeWithExpiration(
+        () -> new CompactionJobPriorityQueueStats(jobAges), 5, 
TimeUnit.SECONDS);
+    this.jobQueueTimer = new AtomicReference<>(Optional.empty());
   }
 
   public synchronized void removeOlderGenerations(Ample.DataLevel level, long 
currGeneration) {
@@ -154,7 +170,8 @@ public class CompactionJobPriorityQueue {
           removals.size(), groupId, level);
     }
 
-    removals.forEach(this::removePreviousSubmissions);
+    // Also clears jobAge timer for tablets that do not need compaction anymore
+    removals.forEach(ke -> removePreviousSubmissions(ke, true));
   }
 
   /**
@@ -164,7 +181,10 @@ public class CompactionJobPriorityQueue {
       long generation) {
     Preconditions.checkArgument(jobs.stream().allMatch(job -> 
job.getGroup().equals(groupId)));
 
-    removePreviousSubmissions(tabletMetadata.getExtent());
+    // Do not clear jobAge timers, they are cleared later at the end of this 
method
+    // if there are no jobs for the extent so we do not reset the timer for an 
extent
+    // that had previous jobs and still has jobs
+    removePreviousSubmissions(tabletMetadata.getExtent(), false);
 
     HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
 
@@ -175,9 +195,14 @@ public class CompactionJobPriorityQueue {
         // its expected that if futures are present then the queue is empty, 
if this is not true
         // then there is a bug
         Preconditions.checkState(jobQueue.isEmpty());
+        // Queue should be empty so jobAges should be empty
+        Preconditions.checkState(jobAges.isEmpty());
         if (future.complete(new CompactionJobQueues.MetaJob(job, 
tabletMetadata))) {
           // successfully completed a future with this job, so do not need to 
queue the job
           jobsAdded++;
+          // Record a time of 0 as job as we were able to complete immediately 
and there
+          // were no jobs waiting
+          jobQueueTimer.get().ifPresent(jqt -> jqt.record(Duration.ZERO));
           continue outer;
         } // else the future was canceled or timed out so could not complete it
         future = futures.poll();
@@ -197,6 +222,9 @@ public class CompactionJobPriorityQueue {
     if (!newEntries.isEmpty()) {
       checkState(tabletJobs.put(tabletMetadata.getExtent(), new 
TabletJobs(generation, newEntries))
           == null);
+      jobAges.computeIfAbsent(tabletMetadata.getExtent(), e -> 
Timer.startNew());
+    } else {
+      jobAges.remove(tabletMetadata.getExtent());
     }
 
     return jobsAdded;
@@ -235,10 +263,19 @@ public class CompactionJobPriorityQueue {
     if (first != null) {
       dequeuedJobs.getAndIncrement();
       var extent = first.getValue().getTabletMetadata().getExtent();
-      Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
+      var timer = jobAges.get(extent);
+      checkState(timer != null);
+      jobQueueTimer.get().ifPresent(jqt -> jqt.record(timer.elapsed()));
+      log.trace("Compaction job age for {} is {} ms", extent, 
timer.elapsed(TimeUnit.MILLISECONDS));
+      Set<CompactionJobPriorityQueue.CjpqKey> jobs = 
tabletJobs.get(extent).jobs;
       checkState(jobs.remove(first.getKey()));
+      // If there are no more jobs for this extent we can remove the timer, 
otherwise
+      // we need to reset it
       if (jobs.isEmpty()) {
         tabletJobs.remove(extent);
+        jobAges.remove(extent);
+      } else {
+        timer.restart();
       }
     }
     return first == null ? null : first.getValue();
@@ -280,11 +317,15 @@ public class CompactionJobPriorityQueue {
     return firstEntry == null ? null : firstEntry.getValue();
   }
 
-  private void removePreviousSubmissions(KeyExtent extent) {
-    TabletJobs prevJobs = tabletJobs.get(extent);
+  private void removePreviousSubmissions(KeyExtent extent, boolean 
removeJobAges) {
+    CompactionJobPriorityQueue.TabletJobs prevJobs = tabletJobs.get(extent);
     if (prevJobs != null) {
       prevJobs.jobs.forEach(jobQueue::remove);
       tabletJobs.remove(extent);
+      if (removeJobAges) {
+        jobAges.remove(extent);
+        log.trace("Removed jobAge timer for tablet {} that no longer needs 
compaction", extent);
+      }
     }
   }
 
@@ -302,7 +343,6 @@ public class CompactionJobPriorityQueue {
           rejectedJobs.getAndIncrement();
         }
       }
-
     }
 
     var key = new CjpqKey(job);
@@ -310,8 +350,56 @@ public class CompactionJobPriorityQueue {
     return key;
   }
 
-  public synchronized void clear() {
-    jobQueue.clear();
-    tabletJobs.clear();
+  public synchronized void clearIfInactive(Duration duration) {
+    // IF the minimum age of jobs in the queue is older than the
+    // duration then clear all the maps as this queue is now
+    // considered inactive
+    if (getJobQueueStats().getMinAge().compareTo(duration) > 0) {
+      jobQueue.clear();
+      tabletJobs.clear();
+      jobAges.clear();
+    }
+  }
+
+  public CompactionJobPriorityQueueStats getJobQueueStats() {
+    return jobQueueStats.get();
+  }
+
+  public void setJobQueueTimerCallback(io.micrometer.core.instrument.Timer 
jobQueueTimer) {
+    this.jobQueueTimer.set(Optional.of(jobQueueTimer));
+  }
+
+  // Used for unit testing, can return the map as is because
+  // it is a ConcurrentHashMap
+  @VisibleForTesting
+  Map<KeyExtent,Timer> getJobAges() {
+    return jobAges;
+  }
+
+  public static class CompactionJobPriorityQueueStats {
+    private final Duration minAge;
+    private final Duration maxAge;
+    private final Duration avgAge;
+
+    @VisibleForTesting
+    CompactionJobPriorityQueueStats(Map<KeyExtent,Timer> jobAges) {
+      final Stat stats = new Stat();
+      jobAges.values().forEach(t -> 
stats.addStat(t.elapsed(TimeUnit.MILLISECONDS)));
+      this.minAge = Duration.ofMillis(stats.min());
+      this.maxAge = Duration.ofMillis(stats.max());
+      this.avgAge = Duration.ofMillis(Math.round(stats.mean()));
+    }
+
+    public Duration getMinAge() {
+      return minAge;
+    }
+
+    public Duration getMaxAge() {
+      return maxAge;
+    }
+
+    public Duration getAvgAge() {
+      return avgAge;
+    }
   }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 05e0b35c55..2592be35aa 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -33,11 +33,13 @@ import 
org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import 
org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue.CompactionJobPriorityQueueStats;
 import 
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
@@ -193,21 +195,26 @@ public class CompactionJobPriorityQueueTest {
     assertEquals(0, queue.getDequeuedJobs());
     assertEquals(1, queue.getRejectedJobs());
     assertEquals(2, queue.getQueuedJobs());
+    // One tablet was added with jobs
+    assertEquals(1, queue.getJobAges().size());
 
     MetaJob job = queue.poll();
     assertEquals(cj1, job.getJob());
     assertEquals(tm, job.getTabletMetadata());
     assertEquals(1, queue.getDequeuedJobs());
+    // still 1 job left so should still have a timer
+    assertEquals(1, queue.getJobAges().size());
 
     job = queue.poll();
     assertEquals(cj2, job.getJob());
     assertEquals(tm, job.getTabletMetadata());
     assertEquals(2, queue.getDequeuedJobs());
+    // no more jobs so timer should be gone
+    assertTrue(queue.getJobAges().isEmpty());
 
     job = queue.poll();
     assertNull(job);
     assertEquals(2, queue.getDequeuedJobs());
-
   }
 
   private static int counter = 1;
@@ -251,6 +258,14 @@ public class CompactionJobPriorityQueueTest {
     assertEquals(100, queue.getMaxSize());
     assertEquals(100, queue.getQueuedJobs());
     assertEquals(900, queue.getRejectedJobs());
+    // There should be 1000 total job ages even though 900 were rejected
+    // as there were 1000 total tablets added
+    assertEquals(1000, queue.getJobAges().size());
+
+    var stats = queue.getJobQueueStats();
+    assertTrue(stats.getMinAge().toMillis() > 0);
+    assertTrue(stats.getMaxAge().toMillis() > 0);
+    assertTrue(stats.getAvgAge().toMillis() > 0);
 
     // iterate over the expected set and make sure that they next job in the 
queue
     // matches
@@ -266,6 +281,29 @@ public class CompactionJobPriorityQueueTest {
     }
 
     assertEquals(100, matchesSeen);
+    // Should be 900 left as the 100 that were polled would clear as there are 
no more
+    // jobs for those tablets. These 900 were rejected so their timers remain 
and will
+    // be cleared if there are no computed jobs when jobs are added again or by
+    // the call to removeOlderGenerations()
+    assertEquals(900, queue.getJobAges().size());
+
+    // Create new stats directly vs using queue.getJobQueueStats() because 
that method
+    // caches the results for a short period
+    stats = new CompactionJobPriorityQueueStats(queue.getJobAges());
+    assertTrue(stats.getMinAge().toMillis() > 0);
+    assertTrue(stats.getMaxAge().toMillis() > 0);
+    assertTrue(stats.getAvgAge().toMillis() > 0);
+
+    // Verify jobAges cleared when calling removeOlderGenerations()
+    queue.removeOlderGenerations(DataLevel.USER, 2);
+
+    // Stats should be 0 if no jobs
+    var jobAges = queue.getJobAges();
+    assertTrue(jobAges.isEmpty());
+    stats = new CompactionJobPriorityQueueStats(queue.getJobAges());
+    assertEquals(0, stats.getMinAge().toMillis());
+    assertEquals(0, stats.getMaxAge().toMillis());
+    assertEquals(0, stats.getAvgAge().toMillis());
   }
 
   /**
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 09ae416091..bba27daf67 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -362,12 +362,20 @@ public class CompactionJobQueuesTest {
 
     jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
     jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+    // Futures were immediately completed so nothing should be queued
+    assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
+
     jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
     jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+    // No futures available, so jobAges should exist for 2 tablets
+    assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size());
 
     var future3 = jobQueues.getAsync(cg1);
     var future4 = jobQueues.getAsync(cg1);
 
+    // Should be back to 0 size after futures complete
+    assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
+
     assertTrue(future1.isDone());
     assertTrue(future2.isDone());
     assertTrue(future3.isDone());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
index 494e9fe6ca..3245e6bf24 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
@@ -18,6 +18,10 @@
  */
 package org.apache.accumulo.test.compaction;
 
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
@@ -151,7 +155,6 @@ public class ExternalCompactionMetricsIT extends 
SharedMiniClusterBase {
 
       boolean sawDCQ1_5 = false;
       boolean sawDCQ2_10 = false;
-
       // wait until expected number of queued are seen in metrics
       while (!sawDCQ1_5 || !sawDCQ2_10) {
         Metric qm = queueMetrics.take();
@@ -165,12 +168,22 @@ public class ExternalCompactionMetricsIT extends 
SharedMiniClusterBase {
 
       boolean sawDCQ1_0 = false;
       boolean sawDCQ2_0 = false;
+      boolean minDCQ1 = false;
+      boolean maxDCQ1 = false;
+      boolean avgDCQ1 = false;
+      boolean timerDCQ1 = false;
 
       // wait until queued goes to zero in metrics
-      while (!sawDCQ1_0 || !sawDCQ2_0) {
+      // and verify stats are positive values
+      while (!sawDCQ1_0 || !sawDCQ2_0 || !minDCQ1 || !maxDCQ1 || !avgDCQ1 || 
!timerDCQ1) {
         Metric qm = queueMetrics.take();
         sawDCQ1_0 |= match(qm, "dcq1", "0");
         sawDCQ2_0 |= match(qm, "dcq2", "0");
+        minDCQ1 |= assertMetric(qm, "dcq1", 
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName());
+        maxDCQ1 |= assertMetric(qm, "dcq1", 
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName());
+        avgDCQ1 |= assertMetric(qm, "dcq1", 
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName());
+        timerDCQ1 |=
+            assertMetric(qm, "dcq1", 
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName());
       }
 
       shutdownTailer.set(true);
@@ -204,4 +217,15 @@ public class ExternalCompactionMetricsIT extends 
SharedMiniClusterBase {
     return false;
   }
 
+  private static boolean assertMetric(Metric input, String queue, String name) 
{
+    if (input.getTags() != null) {
+      String id = input.getTags().get("queue.id");
+      if (id != null && id.equals(queue) && input.getName().equals(name)
+          && Double.parseDouble(input.getValue()) > 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
 }

Reply via email to