This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 41d382638a Fix MetricsIT and compaction queue metrics (#4069) 41d382638a is described below commit 41d382638a2e8b776145ac831408e49c27d576f7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Dec 14 15:33:22 2023 -0500 Fix MetricsIT and compaction queue metrics (#4069) Retain references to the Gauges that are created for each compaction queue so that they can be removed when the queue is removed. Added new constant in MetricsProducer for compaction queues that is used in MetricsIT. Refactored MetricsIT test to ensure that compaction queue metrics are seen before stopping MAC. --- .../accumulo/core/metrics/MetricsProducer.java | 14 +-- .../compaction/queue/CompactionJobQueues.java | 4 + .../accumulo/manager/metrics/QueueMetrics.java | 131 ++++++++++++++------- .../apache/accumulo/test/metrics/MetricsIT.java | 65 +++++++++- 4 files changed, 162 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 8c785267f2..1dd7fcfd99 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -660,17 +660,17 @@ public interface MetricsProducer { String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; - String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_PREFIX + "queue.count"; - String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_PREFIX + "queue.length"; + String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue."; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_QUEUE_PREFIX + "count"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_QUEUE_PREFIX + "length"; String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED = - METRICS_COMPACTOR_PREFIX + "queue.jobs.dequeued"; + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued"; String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED = - METRICS_COMPACTOR_PREFIX + "queue.jobs.queued"; + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued"; String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED = - METRICS_COMPACTOR_PREFIX + "queue.jobs.rejected"; - + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected"; String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY = - METRICS_COMPACTOR_PREFIX + "queue.jobs.priority"; + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 0ff572cb8a..f6090effd2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -62,6 +62,10 @@ public class CompactionJobQueues { return priorityQueues.keySet(); } + public CompactionJobPriorityQueue getQueue(CompactionExecutorId executorId) { + return priorityQueues.get(executorId); + } + public long getQueueMaxSize(CompactionExecutorId executorId) { var prioQ = priorityQueues.get(executorId); return prioQ == null ? 0 : prioQ.getMaxSize(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java index d73fefc64f..87925ceb10 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java @@ -18,27 +18,89 @@ */ package org.apache.accumulo.manager.metrics; -import static org.apache.accumulo.core.metrics.MetricsUtil.formatString; import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; public class QueueMetrics implements MetricsProducer { + + private static class QueueMeters { + private final Gauge length; + private final Gauge jobsQueued; + private final Gauge jobsDequeued; + private final Gauge jobsRejected; + private final Gauge jobsLowestPriority; + + public QueueMeters(MeterRegistry meterRegistry, CompactionExecutorId queueId, + CompactionJobPriorityQueue queue) { + length = + Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, queue, q -> q.getMaxSize()) + .description("Length of priority queues") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical())) + .register(meterRegistry); + + jobsQueued = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, queue, q -> q.getQueuedJobs()) + .description("Count of queued jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical())) + .register(meterRegistry); + + jobsDequeued = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, queue, + q -> q.getDequeuedJobs()) + .description("Count of jobs dequeued") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical())) + .register(meterRegistry); + + jobsRejected = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, queue, + q -> q.getRejectedJobs()) + .description("Count of rejected jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical())) + .register(meterRegistry); + + jobsLowestPriority = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, queue, + q -> q.getLowestPriority()) + .description("Lowest priority queued job") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical())) + .register(meterRegistry); + } + + private void removeMeters(MeterRegistry registry) { + registry.remove(length); + registry.remove(jobsQueued); + registry.remove(jobsDequeued); + registry.remove(jobsRejected); + registry.remove(jobsLowestPriority); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(5); private MeterRegistry meterRegistry = null; private final CompactionJobQueues compactionJobQueues; - private AtomicLong queueCount; + private final Map<CompactionExecutorId,QueueMeters> perQueueMetrics = new HashMap<>(); + private Gauge queueCountMeter = null; public QueueMetrics(CompactionJobQueues compactionJobQueues) { this.compactionJobQueues = compactionJobQueues; @@ -51,46 +113,35 @@ public class QueueMetrics implements MetricsProducer { public void update() { - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues, - CompactionJobQueues::getQueueCount) - .description("Number of current Queues").tags(getCommonTags()).register(meterRegistry); - - for (CompactionExecutorId ceid : compactionJobQueues.getQueueIds()) { - // Normalize the queueId to match metrics tag naming convention. - String queueId = formatString(ceid.toString()); - - // Register queues by ID rather than by object as queues can be deleted. - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid, - compactionJobQueues::getQueueMaxSize) - .description("Length of priority queues") - .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); - - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, ceid, - compactionJobQueues::getQueuedJobs) - .description("Count of queued jobs") - .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + if (queueCountMeter == null) { + queueCountMeter = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues, + CompactionJobQueues::getQueueCount) + .description("Number of current Queues").tags(getCommonTags()).register(meterRegistry); + } + LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds()); - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, ceid, - compactionJobQueues::getDequeuedJobs) - .description("Count of jobs dequeued") - .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + Set<CompactionExecutorId> definedQueues = compactionJobQueues.getQueueIds(); + LOG.debug("update - defined queues: {}", definedQueues); - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, ceid, - compactionJobQueues::getRejectedJobs) - .description("Count of rejected jobs") - .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + Set<CompactionExecutorId> queuesWithMetrics = perQueueMetrics.keySet(); + LOG.debug("update - queues with metrics: {}", queuesWithMetrics); + + SetView<CompactionExecutorId> queuesWithoutMetrics = + Sets.difference(definedQueues, queuesWithMetrics); + queuesWithoutMetrics.forEach(q -> { + LOG.debug("update - creating meters for queue: {}", q); + perQueueMetrics.put(q, new QueueMeters(meterRegistry, q, compactionJobQueues.getQueue(q))); + }); + + SetView<CompactionExecutorId> metricsWithoutQueues = + Sets.difference(queuesWithMetrics, definedQueues); + metricsWithoutQueues.forEach(q -> { + LOG.debug("update - removing meters for queue: {}", q); + perQueueMetrics.get(q).removeMeters(meterRegistry); + perQueueMetrics.remove(q); + }); - Gauge - .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, ceid, - compactionJobQueues::getLowestPriority) - .description("Lowest priority queued job") - .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); - } } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index a9d351dc52..b68159877b 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -21,22 +21,26 @@ package org.apache.accumulo.test.metrics; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.conf.Property; @@ -45,6 +49,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -60,7 +65,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { @Override protected Duration defaultTimeout() { - return Duration.ofMinutes(1); + return Duration.ofMinutes(3); } @BeforeAll @@ -92,9 +97,6 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { @Test public void confirmMetricsPublished() throws Exception { - doWorkToGenerateMetrics(); - cluster.stop(); - Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS, METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM, METRICS_SCAN_RETURN_FOR_MEM, METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED); @@ -114,8 +116,30 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { List<String> statsDMetrics; + final int compactionPriorityQueueLengthBit = 0; + final int compactionPriorityQueueQueuedBit = 1; + final int compactionPriorityQueueDequeuedBit = 2; + final int compactionPriorityQueueRejectedBit = 3; + final int compactionPriorityQueuePriorityBit = 4; + + final BitSet trueSet = new BitSet(5); + trueSet.set(0, 4, true); + + final BitSet queueMetricsSeen = new BitSet(5); + + AtomicReference<Exception> error = new AtomicReference<>(); + Thread workerThread = new Thread(() -> { + try { + doWorkToGenerateMetrics(); + } catch (Exception e) { + error.set(e); + } + }); + workerThread.start(); + // loop until we run out of lines or until we see all expected metrics - while (!(statsDMetrics = sink.getLines()).isEmpty() && !expectedMetricNames.isEmpty()) { + while (!(statsDMetrics = sink.getLines()).isEmpty() && !expectedMetricNames.isEmpty() + && !queueMetricsSeen.intersects(trueSet)) { // for each metric name not yet seen, check if it is expected, flaky, or unknown statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) .map(TestStatsDSink::parseStatsDMetric).map(Metric::getName) @@ -126,14 +150,37 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { } else if (flakyMetrics.contains(name)) { // ignore any flaky metric names seen // these aren't always expected, but we shouldn't be surprised if we see them + } else if (name.startsWith(METRICS_COMPACTOR_PREFIX)) { + // Compactor queue metrics are not guaranteed to be emitted + // during the call to doWorkToGenerateMetrics above. This will + // flip a bit in the BitSet when each metric is seen. The top-level + // loop will continue to iterate until all the metrics are seen. + seenMetricNames.put(name, expectedMetricNames.remove(name)); + if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.equals(name)) { + queueMetricsSeen.set(compactionPriorityQueueLengthBit, true); + } else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.equals(name)) { + queueMetricsSeen.set(compactionPriorityQueueQueuedBit, true); + } else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.equals(name)) { + queueMetricsSeen.set(compactionPriorityQueueDequeuedBit, true); + } else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.equals(name)) { + queueMetricsSeen.set(compactionPriorityQueueRejectedBit, true); + } else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.equals(name)) { + queueMetricsSeen.set(compactionPriorityQueuePriorityBit, true); + } } else { // completely unexpected metric fail("Found accumulo metric not in expectedMetricNames or flakyMetricNames: " + name); } }); + Thread.sleep(4_000); } assertTrue(expectedMetricNames.isEmpty(), "Did not see all expected metric names, missing: " + expectedMetricNames.values()); + + workerThread.join(); + assertNull(error.get()); + cluster.stop(); + } private void doWorkToGenerateMetrics() throws Exception { @@ -167,6 +214,14 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { try (Scanner scanner = client.createScanner(tableName)) { scanner.forEach((k, v) -> {}); } + // Start a compaction with the slow iterator to ensure that the compaction queues + // are not removed quickly + CompactionConfig cc = new CompactionConfig(); + IteratorSetting is = new IteratorSetting(100, "slow", SlowIterator.class); + SlowIterator.setSleepTime(is, 3000); + cc.setIterators(List.of(is)); + cc.setWait(false); + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); client.tableOperations().delete(tableName); while (client.tableOperations().exists(tableName)) { Thread.sleep(1000);