This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 6cfdfc242a3 Adds metrics for compaction queue data size (#5275) 6cfdfc242a3 is described below commit 6cfdfc242a31b36f2aae0c4f9e4c9121e2f57555 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jan 22 11:45:48 2025 -0500 Adds metrics for compaction queue data size (#5275) Added a metric for compaction queue data size, removed some unused code, and reworked an existing compaction metric for the compaction job size changes. Fixes #5269 --- .../java/org/apache/accumulo/core/metrics/Metric.java | 6 ++++-- .../manager/compaction/coordinator/QueueMetrics.java | 15 ++++++++++++--- .../compaction/queue/CompactionJobPriorityQueue.java | 10 ++++------ .../compaction/queue/CompactionJobPriorityQueueTest.java | 13 ------------- .../compaction/CompactionPriorityQueueMetricsIT.java | 16 +++++++++++++--- .../java/org/apache/accumulo/test/metrics/MetricsIT.java | 10 +++++++--- 6 files changed, 40 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index da61305fc21..e5ddf3be6a4 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -54,12 +54,14 @@ public enum Metric { MetricDocSection.COMPACTION), COMPACTOR_JOB_PRIORITY_QUEUES("accumulo.compaction.queue.count", MetricType.GAUGE, "Number of priority queues for compaction jobs.", MetricDocSection.COMPACTION), - COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH("accumulo.compaction.queue.length", MetricType.GAUGE, - "Length of priority queue.", MetricDocSection.COMPACTION), + COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE("accumulo.compaction.queue.max.size", MetricType.GAUGE, + "The maximum size in bytes of all jobs.", MetricDocSection.COMPACTION), COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED("accumulo.compaction.queue.jobs.dequeued", MetricType.GAUGE, "Count of dequeued jobs.", MetricDocSection.COMPACTION), COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED("accumulo.compaction.queue.jobs.queued", MetricType.GAUGE, "Count of queued jobs.", MetricDocSection.COMPACTION), + COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE("accumulo.compaction.queue.jobs.size", MetricType.GAUGE, + "Size of queued jobs in bytes.", MetricDocSection.COMPACTION), COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED("accumulo.compaction.queue.jobs.rejected", MetricType.GAUGE, "Count of rejected jobs.", MetricDocSection.COMPACTION), COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compaction.queue.jobs.priority", diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java index aa429d6999d..b0771bc2fe2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java @@ -27,7 +27,8 @@ import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED; -import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE; import static org.apache.accumulo.core.metrics.MetricsUtil.formatString; import java.util.HashMap; @@ -59,6 +60,7 @@ public class QueueMetrics implements MetricsProducer { private static class QueueMeters { private final Gauge length; private final Gauge jobsQueued; + private final Gauge jobsQueuedSize; private final Gauge jobsDequeued; private final Gauge jobsRejected; private final Gauge jobsLowestPriority; @@ -72,8 +74,8 @@ public class QueueMetrics implements MetricsProducer { var queueId = formatString(cgid.canonical()); length = - Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName(), queue, q -> q.getMaxSize()) - .description(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getDescription()) + Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName(), queue, q -> q.getMaxSize()) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getDescription()) .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); jobsQueued = Gauge @@ -82,6 +84,12 @@ public class QueueMetrics implements MetricsProducer { .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getDescription()) .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + jobsQueuedSize = Gauge + .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName(), queue, + q -> q.getQueuedJobsSize()) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getDescription()) + .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + jobsDequeued = Gauge .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.getName(), queue, q -> q.getDequeuedJobs()) @@ -134,6 +142,7 @@ public class QueueMetrics implements MetricsProducer { registry.remove(jobsMaxAge); registry.remove(jobsAvgAge); registry.remove(jobsQueueTimer); + registry.remove(jobsQueuedSize); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index f183b50b86f..9a36f07f561 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -233,12 +233,6 @@ public class CompactionJobPriorityQueue { return maxSize.get(); } - public synchronized void setMaxSize(long maxSize) { - Preconditions.checkArgument(maxSize > 0, - "Maximum size of the Compaction job priority queue must be greater than 0"); - this.maxSize.set(maxSize); - } - public long getRejectedJobs() { return rejectedJobs.get(); } @@ -251,6 +245,10 @@ public class CompactionJobPriorityQueue { return jobQueue.entrySize(); } + public synchronized long getQueuedJobsSize() { + return jobQueue.dataSize(); + } + public synchronized long getLowestPriority() { if (jobQueue.isEmpty()) { return 0; diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 01c18798754..6617ad5cef2 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -20,7 +20,6 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -339,16 +338,4 @@ public class CompactionJobPriorityQueueTest { < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + CANCEL_THRESHOLD)); assertTrue(maxFuturesSize > 2 * CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD); } - - @Test - public void testChangeMaxSize() { - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); - assertEquals(100, queue.getMaxSize()); - queue.setMaxSize(50); - assertEquals(50, queue.getMaxSize()); - assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(0)); - assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(-1)); - // Make sure previous value was not changed after invalid setting - assertEquals(50, queue.getMaxSize()); - } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 2a032565953..3a8a6a601b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -22,7 +22,8 @@ import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED; -import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -337,7 +338,9 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { } boolean sawMetricsQ1 = false; - while (!sawMetricsQ1) { + boolean sawMetricsQ1Size = false; + + while (!sawMetricsQ1 || !sawMetricsQ1Size) { while (!queueMetrics.isEmpty()) { var qm = queueMetrics.take(); if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName()) @@ -346,7 +349,14 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { sawMetricsQ1 = true; } } + if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName()) + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(qm.getValue()) > 0) { + sawMetricsQ1Size = true; + } + } } + // If metrics are not found in the queue, sleep until the next poll. UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); } @@ -366,7 +376,7 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { } else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName()) && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); - } else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName()) + } else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName()) && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { queueSize = Integer.parseInt(metric.getValue()); } else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) { diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index a5c777c64d4..e7492968cf5 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -149,9 +149,10 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { final int compactionPriorityQueueDequeuedBit = 2; final int compactionPriorityQueueRejectedBit = 3; final int compactionPriorityQueuePriorityBit = 4; + final int compactionPriorityQueueSizeBit = 5; - final BitSet trueSet = new BitSet(5); - trueSet.set(0, 4, true); + final BitSet trueSet = new BitSet(6); + trueSet.set(0, 5, true); final BitSet queueMetricsSeen = new BitSet(5); @@ -187,7 +188,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { seenMetrics.add(metric); expectedMetrics.remove(metric); switch (metric) { - case COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH: + case COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE: queueMetricsSeen.set(compactionPriorityQueueLengthBit, true); break; case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED: @@ -202,6 +203,9 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY: queueMetricsSeen.set(compactionPriorityQueuePriorityBit, true); break; + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE: + queueMetricsSeen.set(compactionPriorityQueueSizeBit, true); + break; default: break; }