This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 890fc4741a fixes ExternalCompactionProgressIT (#4665) 890fc4741a is described below commit 890fc4741a7984407f303bfadd8ee858aad0a7f5 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jun 12 12:14:03 2024 -0400 fixes ExternalCompactionProgressIT (#4665) This test was validating the compactor busy count metric. The test has 9 compactor processes running. When it saw a busy count from any of the 9 it would set an atomic long. This is made it likely that the 8 of 9 not busy compactors would set zero making the test flaky. Replaced the atomic long w/ a concurrent map where each compactor process has an entry in the map for its busy count. --- .../compaction/ExternalCompactionProgressIT.java | 38 +++++++++++++++++----- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 3b3042a11f..a797faa36e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -34,10 +34,11 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.Accumulo; @@ -123,13 +124,26 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { cfg.setSystemProperties(sysProps); } + private static long computeBusyCount(String resourceGroup, + ConcurrentHashMap<String,Long> compactorBusy) { + var stats = + compactorBusy.entrySet().stream().filter(e -> e.getKey().startsWith(resourceGroup + ":")) + .mapToLong(Map.Entry::getValue).summaryStatistics(); + if (stats.getCount() == 0) { + // signifies nothing was present, this differentiates between the case where things are + // present w/ a zero value + return -1; + } + return stats.getSum(); + } + @Test public void testProgressViaMetrics() throws Exception { String table = this.getUniqueNames(1)[0]; final AtomicLong totalEntriesRead = new AtomicLong(0); final AtomicLong totalEntriesWritten = new AtomicLong(0); - final AtomicInteger compactorBusy = new AtomicInteger(-1); + final ConcurrentHashMap<String,Long> compactorBusy = new ConcurrentHashMap<>(); final long expectedEntriesRead = 18432; final long expectedEntriesWritten = 13312; @@ -149,12 +163,13 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, - "Compactor busy metric should be false initially"); + Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000, + CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false initially"); compact(client, table, 2, GROUP1, false); - Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, CHECKER_THREAD_SLEEP_MS, + Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 1, 30_000, + CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be true after starting compaction"); Wait.waitFor(() -> { @@ -170,7 +185,8 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { }, 30_000, CHECKER_THREAD_SLEEP_MS, "Entries read and written metrics values did not match expected values"); - Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, + Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000, + CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false once compaction completes"); log.info("Done Compacting table"); @@ -189,7 +205,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { * @param compactorBusy this is set to the value of the compactor busy metric */ private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, - AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) { + AtomicLong totalEntriesWritten, ConcurrentHashMap<String,Long> compactorBusy) { return Threads.createThread("metric-tailer", () -> { log.info("Starting metric tailer"); @@ -215,7 +231,13 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { totalEntriesWritten.addAndGet(value); break; case MetricsProducer.METRICS_COMPACTOR_BUSY: - compactorBusy.set(value); + // expect these tags to be present, so have the test fail w/ NPE if they are not + var host = Objects.requireNonNull(metric.getTags().get("host")); + var port = Objects.requireNonNull(metric.getTags().get("port")); + var resourceGroup = Objects.requireNonNull(metric.getTags().get("resource.group")); + var key = resourceGroup + ":" + host + ":" + port; + log.debug("setting busy count {} {}", key, value); + compactorBusy.put(key, (long) value); break; } }