This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 890fc4741a fixes ExternalCompactionProgressIT (#4665)
890fc4741a is described below

commit 890fc4741a7984407f303bfadd8ee858aad0a7f5
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Jun 12 12:14:03 2024 -0400

    fixes ExternalCompactionProgressIT (#4665)
    
    This test was validating the compactor busy count metric.  The test has 9
    compactor processes running.  When it saw a busy count from any of the 9 it
    would set an atomic long.  This is made it likely that the 8 of 9 not busy
    compactors would set zero making the test flaky.  Replaced the atomic long 
w/ a
    concurrent map where each compactor process has an entry in the map for its
    busy count.
---
 .../compaction/ExternalCompactionProgressIT.java   | 38 +++++++++++++++++-----
 1 file changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 3b3042a11f..a797faa36e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -34,10 +34,11 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -123,13 +124,26 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
     cfg.setSystemProperties(sysProps);
   }
 
+  private static long computeBusyCount(String resourceGroup,
+      ConcurrentHashMap<String,Long> compactorBusy) {
+    var stats =
+        compactorBusy.entrySet().stream().filter(e -> 
e.getKey().startsWith(resourceGroup + ":"))
+            .mapToLong(Map.Entry::getValue).summaryStatistics();
+    if (stats.getCount() == 0) {
+      // signifies nothing was present, this differentiates between the case 
where things are
+      // present w/ a zero value
+      return -1;
+    }
+    return stats.getSum();
+  }
+
   @Test
   public void testProgressViaMetrics() throws Exception {
     String table = this.getUniqueNames(1)[0];
 
     final AtomicLong totalEntriesRead = new AtomicLong(0);
     final AtomicLong totalEntriesWritten = new AtomicLong(0);
-    final AtomicInteger compactorBusy = new AtomicInteger(-1);
+    final ConcurrentHashMap<String,Long> compactorBusy = new 
ConcurrentHashMap<>();
     final long expectedEntriesRead = 18432;
     final long expectedEntriesWritten = 13312;
 
@@ -149,12 +163,13 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
           EnumSet.of(IteratorUtil.IteratorScope.majc));
       log.info("Compacting table");
 
-      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
-          "Compactor busy metric should be false initially");
+      Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000,
+          CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false 
initially");
 
       compact(client, table, 2, GROUP1, false);
 
-      Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+      Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 1, 30_000,
+          CHECKER_THREAD_SLEEP_MS,
           "Compactor busy metric should be true after starting compaction");
 
       Wait.waitFor(() -> {
@@ -170,7 +185,8 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       }, 30_000, CHECKER_THREAD_SLEEP_MS,
           "Entries read and written metrics values did not match expected 
values");
 
-      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+      Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000,
+          CHECKER_THREAD_SLEEP_MS,
           "Compactor busy metric should be false once compaction completes");
 
       log.info("Done Compacting table");
@@ -189,7 +205,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
    * @param compactorBusy this is set to the value of the compactor busy metric
    */
   private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
-      AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
+      AtomicLong totalEntriesWritten, ConcurrentHashMap<String,Long> 
compactorBusy) {
     return Threads.createThread("metric-tailer", () -> {
       log.info("Starting metric tailer");
 
@@ -215,7 +231,13 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
               totalEntriesWritten.addAndGet(value);
               break;
             case MetricsProducer.METRICS_COMPACTOR_BUSY:
-              compactorBusy.set(value);
+              // expect these tags to be present, so have the test fail w/ NPE 
if they are not
+              var host = Objects.requireNonNull(metric.getTags().get("host"));
+              var port = Objects.requireNonNull(metric.getTags().get("port"));
+              var resourceGroup = 
Objects.requireNonNull(metric.getTags().get("resource.group"));
+              var key = resourceGroup + ":" + host + ":" + port;
+              log.debug("setting busy count {} {}", key, value);
+              compactorBusy.put(key, (long) value);
               break;
           }
         }

Reply via email to