This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 4099860261 Updated coordinator log warning to account for busy 
compactors (#4372)
4099860261 is described below

commit 4099860261a6cdb68700176ced44eb0519420e88
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Mar 18 11:00:08 2024 -0400

    Updated coordinator log warning to account for busy compactors (#4372)
    
    Modified the logic in CompactionCoordinator to only warn about
    compactors not checking in when there are idle compactors for
    that group. Refactored code to remove a TODO.
    
    Fixes #4219
---
 .../coordinator/CompactionCoordinator.java         | 84 ++++++++++++++--------
 .../compaction/CompactionCoordinatorTest.java      | 12 ++--
 2 files changed, 64 insertions(+), 32 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 178b4f1e95..48419a47a0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -18,11 +18,9 @@
  */
 package org.apache.accumulo.manager.compaction.coordinator;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
@@ -37,12 +35,15 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -163,7 +164,7 @@ public class CompactionCoordinator
   private final CompactionJobQueues jobQueues;
   private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
   // Exposed for tests
-  protected volatile Boolean shutdown = false;
+  protected CountDownLatch shutdown = new CountDownLatch(1);
 
   private final ScheduledThreadPoolExecutor schedExecutor;
 
@@ -220,7 +221,7 @@ public class CompactionCoordinator
   }
 
   public void shutdown() {
-    shutdown = true;
+    shutdown.countDown();
     var localThread = serviceThread;
     if (localThread != null) {
       try {
@@ -243,6 +244,28 @@ public class CompactionCoordinator
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
+  protected void startIdleCompactionWatcher() {
+
+    ScheduledFuture<?> future = 
schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
+        getTServerCheckInterval(), getTServerCheckInterval(), 
TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
+  private void idleCompactionWarning() {
+
+    long now = System.currentTimeMillis();
+    Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
+    TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
+      if ((now - lastCheckTime) > getMissingCompactorWarningTime()
+          && jobQueues.getQueuedJobs(groupName) > 0
+          && idleCompactors.containsKey(groupName.canonical())) {
+        LOG.warn("No compactors have checked in with coordinator for group {} 
in {}ms", groupName,
+            getMissingCompactorWarningTime());
+      }
+    });
+
+  }
+
   @Override
   public void run() {
 
@@ -270,35 +293,40 @@ public class CompactionCoordinator
 
     startDeadCompactionDetector();
 
-    // ELASTICITY_TODO the main function of the following loop was getting 
group summaries from
-    // tservers. Its no longer doing that. May be best to remove the loop and 
make the remaining
-    // task a scheduled one.
-
-    LOG.info("Starting loop to check for compactors not checking in");
-    while (!shutdown) {
-      long start = System.currentTimeMillis();
-
-      long now = System.currentTimeMillis();
-      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
-        if ((now - v) > getMissingCompactorWarningTime()) {
-          // ELASTICITY_TODO may want to consider of the group has any jobs 
queued OR if the group
-          // still exist in configuration
-          LOG.warn("No compactors have checked in with coordinator for group 
{} in {}ms", k,
-              getMissingCompactorWarningTime());
-        }
-      });
+    startIdleCompactionWatcher();
 
-      long checkInterval = getTServerCheckInterval();
-      long duration = (System.currentTimeMillis() - start);
-      if (checkInterval - duration > 0) {
-        LOG.debug("Waiting {}ms for next group check", (checkInterval - 
duration));
-        UtilWaitThread.sleep(checkInterval - duration);
-      }
+    try {
+      shutdown.await();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted waiting for shutdown latch.", e);
     }
 
     LOG.info("Shutting down");
   }
 
+  private Map<String,Set<HostAndPort>> getIdleCompactors() {
+
+    Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
+    ExternalCompactionUtil.getCompactorAddrs(ctx)
+        .forEach((group, compactorList) -> allCompactors.put(group, new 
HashSet<>(compactorList)));
+
+    Set<String> emptyQueues = new HashSet<>();
+
+    // Remove all of the compactors that are running a compaction
+    RUNNING_CACHE.values().forEach(rc -> {
+      Set<HostAndPort> busyCompactors = allCompactors.get(rc.getGroupName());
+      if (busyCompactors != null
+          && 
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
+        if (busyCompactors.isEmpty()) {
+          emptyQueues.add(rc.getGroupName());
+        }
+      }
+    });
+    // Remove entries with empty queues
+    emptyQueues.forEach(e -> allCompactors.remove(e));
+    return allCompactors;
+  }
+
   protected void startDeadCompactionDetector() {
     deadCompactionDetector.start();
   }
@@ -674,7 +702,7 @@ public class CompactionCoordinator
     var localFates = fateInstances.get();
     while (localFates == null) {
       UtilWaitThread.sleep(100);
-      if (shutdown) {
+      if (shutdown.getCount() == 0) {
         return;
       }
       localFates = fateInstances.get();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 3619215985..20f5cdc17e 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -117,10 +117,7 @@ public class CompactionCoordinatorTest {
 
     @Override
     protected long getTServerCheckInterval() {
-      // This is called from CompactionCoordinator.run(). Setting shutdown to 
true
-      // here will exit the loop in run()
-      this.shutdown = true;
-      return 0L;
+      return 5000L;
     }
 
     @Override
@@ -129,6 +126,13 @@ public class CompactionCoordinatorTest {
     @Override
     protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
 
+    @Override
+    protected void startIdleCompactionWatcher() {
+      // This is called from CompactionCoordinator.run(). Counting down
+      // the latch will exit the run method
+      this.shutdown.countDown();
+    }
+
     @Override
     public void compactionCompleted(TInfo tinfo, TCredentials credentials,
         String externalCompactionId, TKeyExtent textent, TCompactionStats 
stats)

Reply via email to