This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cfdfc242a3 Adds metrics for compaction queue data size (#5275)
6cfdfc242a3 is described below

commit 6cfdfc242a31b36f2aae0c4f9e4c9121e2f57555
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Jan 22 11:45:48 2025 -0500

    Adds metrics for compaction queue data size (#5275)
    
    Added a metric for compaction queue data size, removed some unused code,
    and reworked an existing compaction metric for the compaction job size
    changes.
    
    Fixes #5269
---
 .../java/org/apache/accumulo/core/metrics/Metric.java    |  6 ++++--
 .../manager/compaction/coordinator/QueueMetrics.java     | 15 ++++++++++++---
 .../compaction/queue/CompactionJobPriorityQueue.java     | 10 ++++------
 .../compaction/queue/CompactionJobPriorityQueueTest.java | 13 -------------
 .../compaction/CompactionPriorityQueueMetricsIT.java     | 16 +++++++++++++---
 .../java/org/apache/accumulo/test/metrics/MetricsIT.java | 10 +++++++---
 6 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index da61305fc21..e5ddf3be6a4 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -54,12 +54,14 @@ public enum Metric {
       MetricDocSection.COMPACTION),
   COMPACTOR_JOB_PRIORITY_QUEUES("accumulo.compaction.queue.count", 
MetricType.GAUGE,
       "Number of priority queues for compaction jobs.", 
MetricDocSection.COMPACTION),
-  COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH("accumulo.compaction.queue.length", 
MetricType.GAUGE,
-      "Length of priority queue.", MetricDocSection.COMPACTION),
+  COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE("accumulo.compaction.queue.max.size", 
MetricType.GAUGE,
+      "The maximum size in bytes of all jobs.", MetricDocSection.COMPACTION),
   
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED("accumulo.compaction.queue.jobs.dequeued",
       MetricType.GAUGE, "Count of dequeued jobs.", 
MetricDocSection.COMPACTION),
   
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED("accumulo.compaction.queue.jobs.queued",
       MetricType.GAUGE, "Count of queued jobs.", MetricDocSection.COMPACTION),
+  
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE("accumulo.compaction.queue.jobs.size", 
MetricType.GAUGE,
+      "Size of queued jobs in bytes.", MetricDocSection.COMPACTION),
   
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED("accumulo.compaction.queue.jobs.rejected",
       MetricType.GAUGE, "Count of rejected jobs.", 
MetricDocSection.COMPACTION),
   
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compaction.queue.jobs.priority",
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index aa429d6999d..b0771bc2fe2 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@ -27,7 +27,8 @@ import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
-import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE;
 import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
 
 import java.util.HashMap;
@@ -59,6 +60,7 @@ public class QueueMetrics implements MetricsProducer {
   private static class QueueMeters {
     private final Gauge length;
     private final Gauge jobsQueued;
+    private final Gauge jobsQueuedSize;
     private final Gauge jobsDequeued;
     private final Gauge jobsRejected;
     private final Gauge jobsLowestPriority;
@@ -72,8 +74,8 @@ public class QueueMetrics implements MetricsProducer {
       var queueId = formatString(cgid.canonical());
 
       length =
-          Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName(), queue, 
q -> q.getMaxSize())
-              
.description(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getDescription())
+          Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName(), 
queue, q -> q.getMaxSize())
+              
.description(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getDescription())
               .tags(List.of(Tag.of("queue.id", 
queueId))).register(meterRegistry);
 
       jobsQueued = Gauge
@@ -82,6 +84,12 @@ public class QueueMetrics implements MetricsProducer {
           
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getDescription())
           .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
 
+      jobsQueuedSize = Gauge
+          .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName(), queue,
+              q -> q.getQueuedJobsSize())
+          .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getDescription())
+          .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
       jobsDequeued = Gauge
           .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.getName(), queue,
               q -> q.getDequeuedJobs())
@@ -134,6 +142,7 @@ public class QueueMetrics implements MetricsProducer {
       registry.remove(jobsMaxAge);
       registry.remove(jobsAvgAge);
       registry.remove(jobsQueueTimer);
+      registry.remove(jobsQueuedSize);
     }
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index f183b50b86f..9a36f07f561 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -233,12 +233,6 @@ public class CompactionJobPriorityQueue {
     return maxSize.get();
   }
 
-  public synchronized void setMaxSize(long maxSize) {
-    Preconditions.checkArgument(maxSize > 0,
-        "Maximum size of the Compaction job priority queue must be greater 
than 0");
-    this.maxSize.set(maxSize);
-  }
-
   public long getRejectedJobs() {
     return rejectedJobs.get();
   }
@@ -251,6 +245,10 @@ public class CompactionJobPriorityQueue {
     return jobQueue.entrySize();
   }
 
+  public synchronized long getQueuedJobsSize() {
+    return jobQueue.dataSize();
+  }
+
   public synchronized long getLowestPriority() {
     if (jobQueue.isEmpty()) {
       return 0;
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 01c18798754..6617ad5cef2 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -339,16 +338,4 @@ public class CompactionJobPriorityQueueTest {
         < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + 
CANCEL_THRESHOLD));
     assertTrue(maxFuturesSize > 2 * 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
   }
-
-  @Test
-  public void testChangeMaxSize() {
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100, mj -> 1);
-    assertEquals(100, queue.getMaxSize());
-    queue.setMaxSize(50);
-    assertEquals(50, queue.getMaxSize());
-    assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(0));
-    assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(-1));
-    // Make sure previous value was not changed after invalid setting
-    assertEquals(50, queue.getMaxSize());
-  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 2a032565953..3a8a6a601b4 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -22,7 +22,8 @@ import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
-import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE;
+import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -337,7 +338,9 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
     }
 
     boolean sawMetricsQ1 = false;
-    while (!sawMetricsQ1) {
+    boolean sawMetricsQ1Size = false;
+
+    while (!sawMetricsQ1 || !sawMetricsQ1Size) {
       while (!queueMetrics.isEmpty()) {
         var qm = queueMetrics.take();
         if 
(qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
@@ -346,7 +349,14 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
             sawMetricsQ1 = true;
           }
         }
+        if 
(qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName())
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            sawMetricsQ1Size = true;
+          }
+        }
       }
+
       // If metrics are not found in the queue, sleep until the next poll.
       
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
     }
@@ -366,7 +376,7 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
       } else if 
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())
           && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
         lowestPriority = Math.max(lowestPriority, 
Long.parseLong(metric.getValue()));
-      } else if 
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName())
+      } else if 
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName())
           && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
         queueSize = Integer.parseInt(metric.getValue());
       } else if 
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index a5c777c64d4..e7492968cf5 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -149,9 +149,10 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
     final int compactionPriorityQueueDequeuedBit = 2;
     final int compactionPriorityQueueRejectedBit = 3;
     final int compactionPriorityQueuePriorityBit = 4;
+    final int compactionPriorityQueueSizeBit = 5;
 
-    final BitSet trueSet = new BitSet(5);
-    trueSet.set(0, 4, true);
+    final BitSet trueSet = new BitSet(6);
+    trueSet.set(0, 5, true);
 
     final BitSet queueMetricsSeen = new BitSet(5);
 
@@ -187,7 +188,7 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
               seenMetrics.add(metric);
               expectedMetrics.remove(metric);
               switch (metric) {
-                case COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH:
+                case COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE:
                   queueMetricsSeen.set(compactionPriorityQueueLengthBit, true);
                   break;
                 case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED:
@@ -202,6 +203,9 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
                 case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY:
                   queueMetricsSeen.set(compactionPriorityQueuePriorityBit, 
true);
                   break;
+                case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE:
+                  queueMetricsSeen.set(compactionPriorityQueueSizeBit, true);
+                  break;
                 default:
                   break;
               }

Reply via email to