This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f22e774ef6 Stop tracking last compactor check-in for non-existent 
groups (#4403)
f22e774ef6 is described below

commit f22e774ef6544343ef9d467e8620161eae0f3824
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Aug 20 16:48:03 2024 -0400

    Stop tracking last compactor check-in for non-existent groups (#4403)
    
    Consolidated internal cleanup methods in the Coordinator.
    Added logic to stop tracking compactor check-in times for
    groups that have been removed.
---
 .../coordinator/CompactionCoordinator.java         | 220 +++++++++++++++------
 .../coordinator/DeadCompactionDetector.java        |   3 +-
 .../compaction/CompactionCoordinatorTest.java      |  16 +-
 3 files changed, 169 insertions(+), 70 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 92bc94f4ab..9b31ca33f1 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -70,6 +70,7 @@ import 
org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -98,6 +99,8 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
@@ -107,6 +110,8 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.cache.Caches.CacheName;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -120,6 +125,7 @@ import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactio
 import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 import org.apache.accumulo.server.security.SecurityOperation;
@@ -163,7 +169,6 @@ public class CompactionCoordinator
       new ConcurrentHashMap<>();
 
   /* Map of group name to last time compactor called to get a compaction job */
-  // ELASTICITY_TODO #4403 need to clean out groups that are no longer 
configured..
   private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
   private final ServerContext ctx;
@@ -186,6 +191,8 @@ public class CompactionCoordinator
   private final LoadingCache<String,Integer> compactorCounts;
   private final int jobQueueInitialSize;
 
+  private volatile long coordinatorStartTime;
+
   public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
       AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, 
Manager manager) {
     this.ctx = ctx;
@@ -253,44 +260,23 @@ public class CompactionCoordinator
     }
   }
 
-  protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
+  protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
+    ScheduledFuture<?> future = schedExecutor
+        .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, 
TimeUnit.MINUTES);
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
-  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
+  protected void startInternalStateCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
     ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
-  protected void startIdleCompactionWatcher() {
-
-    ScheduledFuture<?> future = 
schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
-        getTServerCheckInterval(), getTServerCheckInterval(), 
TimeUnit.MILLISECONDS);
+        schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, 
TimeUnit.MINUTES);
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
-  private void idleCompactionWarning() {
-
-    long now = System.currentTimeMillis();
-    Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
-    TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
-      if ((now - lastCheckTime) > getMissingCompactorWarningTime()
-          && jobQueues.getQueuedJobs(groupName) > 0
-          && idleCompactors.containsKey(groupName.canonical())) {
-        LOG.warn("No compactors have checked in with coordinator for group {} 
in {}ms", groupName,
-            getMissingCompactorWarningTime());
-      }
-    });
-
-  }
-
   @Override
   public void run() {
-    startCompactionCleaner(schedExecutor);
-    startRunningCleaner(schedExecutor);
+
+    this.coordinatorStartTime = System.currentTimeMillis();
+    startCompactorZKCleaner(schedExecutor);
 
     // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
     // Attempt to get the running compactions on the compactors and then 
resolve which tserver
@@ -312,8 +298,7 @@ public class CompactionCoordinator
     }
 
     startDeadCompactionDetector();
-
-    startIdleCompactionWatcher();
+    startInternalStateCleaner(schedExecutor);
 
     try {
       shutdown.await();
@@ -324,13 +309,14 @@ public class CompactionCoordinator
     LOG.info("Shutting down");
   }
 
-  private Map<String,Set<HostAndPort>> getIdleCompactors() {
+  private Map<String,Set<HostAndPort>>
+      getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) {
 
-    Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
-    ExternalCompactionUtil.getCompactorAddrs(ctx)
+    final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
+    runningCompactors
         .forEach((group, compactorList) -> allCompactors.put(group, new 
HashSet<>(compactorList)));
 
-    Set<String> emptyQueues = new HashSet<>();
+    final Set<String> emptyQueues = new HashSet<>();
 
     // Remove all of the compactors that are running a compaction
     RUNNING_CACHE.values().forEach(rc -> {
@@ -939,30 +925,6 @@ public class CompactionCoordinator
     }
   }
 
-  /**
-   * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
-   * method periodically cleans those up.
-   */
-  public void cleanUpRunning() {
-
-    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
-    // avoid removing things that are added while reading the metadata.
-    Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
-
-    // grab the ids that are listed as running in the metadata table. It 
important that this is done
-    // after getting the snapshot.
-    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
-
-    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
-
-    // remove ids that are in the running set but not in the metadata table
-    idsToRemove.forEach(this::recordCompletion);
-
-    if (idsToRemove.size() > 0) {
-      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
-    }
-  }
-
   /**
    * Return information about running compactions
    *
@@ -1049,6 +1011,11 @@ public class CompactionCoordinator
     return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
   }
 
+  /* Method exists to be overridden in test to hide static method */
+  protected Map<String,Set<HostAndPort>> getRunningCompactors() {
+    return ExternalCompactionUtil.getCompactorAddrs(this.ctx);
+  }
+
   /* Method exists to be overridden in test to hide static method */
   protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {
     HostAndPort hostPort = HostAndPort.fromString(address);
@@ -1065,7 +1032,7 @@ public class CompactionCoordinator
     }
   }
 
-  private void cleanUpCompactors() {
+  private void cleanUpEmptyCompactorPathInZK() {
     final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 
     final var zoorw = this.ctx.getZooReaderWriter();
@@ -1118,6 +1085,137 @@ public class CompactionCoordinator
     }
   }
 
+  private Set<CompactorGroupId> getCompactionServicesConfigurationGroups()
+      throws ReflectiveOperationException, IllegalArgumentException, 
SecurityException {
+
+    Set<CompactorGroupId> groups = new HashSet<>();
+    AccumuloConfiguration config = ctx.getConfiguration();
+    CompactionServicesConfig servicesConfig = new 
CompactionServicesConfig(config);
+
+    for (var entry : servicesConfig.getPlanners().entrySet()) {
+      String serviceId = entry.getKey();
+      String plannerClassName = entry.getValue();
+
+      Class<? extends CompactionPlanner> plannerClass =
+          Class.forName(plannerClassName).asSubclass(CompactionPlanner.class);
+      CompactionPlanner planner = 
plannerClass.getDeclaredConstructor().newInstance();
+
+      var initParams = new 
CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
+          servicesConfig.getPlannerPrefix(serviceId), 
servicesConfig.getOptions().get(serviceId),
+          new ServiceEnvironmentImpl(ctx));
+
+      planner.init(initParams);
+
+      groups.addAll(initParams.getRequestedGroups());
+    }
+    return groups;
+  }
+
+  public void cleanUpInternalState() {
+
+    // This method does the following:
+    //
+    // 1. Removes entries from RUNNING_CACHE that are not really running
+    // 2. Cancels running compactions for groups that are not in the current 
configuration
+    // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
+    // 4. Log groups with no compactors
+    // 5. Log compactors with no groups
+    // 6. Log groups with compactors and queued jos that have not checked in
+
+    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
+    // avoid removing things that are added while reading the metadata.
+    final Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(RUNNING_CACHE.keySet());
+
+    // grab the ids that are listed as running in the metadata table. It 
important that this is done
+    // after getting the snapshot.
+    final Set<ExternalCompactionId> idsInMetadata = 
readExternalCompactionIds();
+
+    final Set<ExternalCompactionId> idsToRemove = Sets.difference(idsSnapshot, 
idsInMetadata);
+
+    // remove ids that are in the running set but not in the metadata table
+    idsToRemove.forEach(this::recordCompletion);
+    if (idsToRemove.size() > 0) {
+      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
+    }
+
+    // Get the set of groups being referenced in the current configuration
+    Set<CompactorGroupId> groupsInConfiguration = null;
+    try {
+      groupsInConfiguration = getCompactionServicesConfigurationGroups();
+    } catch (RuntimeException | ReflectiveOperationException e) {
+      LOG.error(
+          "Error getting groups from the compaction services configuration. 
Unable to clean up internal state.",
+          e);
+      return;
+    }
+
+    // Compaction jobs are created in the TabletGroupWatcher and added to the 
Coordinator
+    // via the addJobs method which adds the job to the CompactionJobQueues 
object.
+    final Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds();
+
+    final Set<CompactorGroupId> jobGroupsNotInConfiguration =
+        Sets.difference(groupsWithJobs, groupsInConfiguration);
+
+    if (jobGroupsNotInConfiguration != null && 
!jobGroupsNotInConfiguration.isEmpty()) {
+      RUNNING_CACHE.values().forEach(rc -> {
+        if 
(jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) {
+          LOG.warn(
+              "External compaction {} running in group {} on compactor {},"
+                  + " but group not found in current configuration. Failing 
compaction...",
+              rc.getJob().getExternalCompactionId(), rc.getGroupName(), 
rc.getCompactorAddress());
+          cancelCompactionOnCompactor(rc.getCompactorAddress(),
+              rc.getJob().getExternalCompactionId());
+        }
+      });
+
+      final Set<CompactorGroupId> trackedGroups = 
Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet());
+      TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration);
+      LOG.debug("No longer tracking compactor check-in times for groups: {}",
+          Sets.difference(trackedGroups, 
TIME_COMPACTOR_LAST_CHECKED.keySet()));
+    }
+
+    final Map<String,Set<HostAndPort>> runningCompactors = 
getRunningCompactors();
+
+    final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
+    runningCompactors.keySet()
+        .forEach(group -> 
runningCompactorGroups.add(CompactorGroupId.of(group)));
+
+    final Set<CompactorGroupId> groupsWithNoCompactors =
+        Sets.difference(groupsInConfiguration, runningCompactorGroups);
+    if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
+      for (CompactorGroupId group : groupsWithNoCompactors) {
+        long queuedJobCount = jobQueues.getQueuedJobs(group);
+        if (queuedJobCount > 0) {
+          LOG.warn("Compactor group {} has {} queued compactions but no 
running compactors", group,
+              queuedJobCount);
+        }
+      }
+    }
+
+    final Set<CompactorGroupId> compactorsWithNoGroups =
+        Sets.difference(runningCompactorGroups, groupsInConfiguration);
+    if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
+      LOG.warn(
+          "The following groups have running compactors, but are not in the 
current configuration: {}",
+          compactorsWithNoGroups);
+    }
+
+    final long now = System.currentTimeMillis();
+    final long warningTime = getMissingCompactorWarningTime();
+    Map<String,Set<HostAndPort>> idleCompactors = 
getIdleCompactors(runningCompactors);
+    for (CompactorGroupId groupName : groupsInConfiguration) {
+      long lastCheckTime =
+          TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, 
coordinatorStartTime);
+      if ((now - lastCheckTime) > warningTime && 
jobQueues.getQueuedJobs(groupName) > 0
+          && idleCompactors.containsKey(groupName.canonical())) {
+        LOG.warn(
+            "The group {} has queued jobs and {} idle compactors, however none 
have checked in "
+                + "with coordinator for {}ms",
+            groupName, idleCompactors.get(groupName.canonical()).size(), 
warningTime);
+      }
+    }
+  }
+
   private static Set<StoredTabletFile> 
getFilesReservedBySelection(TabletMetadata tabletMetadata,
       SteadyTime steadyTime, ServerContext ctx) {
     if (tabletMetadata.getSelectedFiles() == null) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index bf702b0db7..7dbf45a3d6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -230,8 +230,7 @@ public class DeadCompactionDetector {
       this.deadCompactions.keySet().removeAll(toFail);
     }
 
-    // Find and delete any known tables that have unreferenced
-    // compaction tmp files.
+    // Find and delete compaction tmp files that are unreferenced
     if (!tablesWithUnreferencedTmpFiles.isEmpty()) {
 
       Set<TableId> copy = new HashSet<>();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 58a592f036..90c100aa72 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -148,13 +148,10 @@ public class CompactionCoordinatorTest {
     }
 
     @Override
-    protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
+    protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
 
     @Override
-    protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
-
-    @Override
-    protected void startIdleCompactionWatcher() {
+    protected void startInternalStateCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
       // This is called from CompactionCoordinator.run(). Counting down
       // the latch will exit the run method
       this.shutdown.countDown();
@@ -196,6 +193,11 @@ public class CompactionCoordinatorTest {
       return runningCompactions;
     }
 
+    @Override
+    protected Map<String,Set<HostAndPort>> getRunningCompactors() {
+      return Map.of();
+    }
+
     @Override
     protected CompactionMetadata reserveCompaction(MetaJob metaJob, String 
compactorAddress,
         ExternalCompactionId externalCompactionId) {
@@ -434,13 +436,13 @@ public class CompactionCoordinatorTest {
     coordinator.getRunning().put(ecid2, new RunningCompaction(new 
TExternalCompaction()));
     coordinator.getRunning().put(ecid3, new RunningCompaction(new 
TExternalCompaction()));
 
-    coordinator.cleanUpRunning();
+    coordinator.cleanUpInternalState();
 
     assertEquals(Set.of(ecid1, ecid2, ecid3), 
coordinator.getRunning().keySet());
 
     coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
 
-    coordinator.cleanUpRunning();
+    coordinator.cleanUpInternalState();
 
     assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());
 

Reply via email to