This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 4099860261 Updated coordinator log warning to account for busy compactors (#4372) 4099860261 is described below commit 4099860261a6cdb68700176ced44eb0519420e88 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Mar 18 11:00:08 2024 -0400 Updated coordinator log warning to account for busy compactors (#4372) Modified the logic in CompactionCoordinator to only warn about compactors not checking in when there are idle compactors for that group. Refactored code to remove a TODO. Fixes #4219 --- .../coordinator/CompactionCoordinator.java | 84 ++++++++++++++-------- .../compaction/CompactionCoordinatorTest.java | 12 ++-- 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 178b4f1e95..48419a47a0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,11 +18,9 @@ */ package org.apache.accumulo.manager.compaction.coordinator; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; @@ -37,12 +35,15 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -163,7 +164,7 @@ public class CompactionCoordinator private final CompactionJobQueues jobQueues; private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; // Exposed for tests - protected volatile Boolean shutdown = false; + protected CountDownLatch shutdown = new CountDownLatch(1); private final ScheduledThreadPoolExecutor schedExecutor; @@ -220,7 +221,7 @@ public class CompactionCoordinator } public void shutdown() { - shutdown = true; + shutdown.countDown(); var localThread = serviceThread; if (localThread != null) { try { @@ -243,6 +244,28 @@ public class CompactionCoordinator ThreadPools.watchNonCriticalScheduledTask(future); } + protected void startIdleCompactionWatcher() { + + ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning, + getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void idleCompactionWarning() { + + long now = System.currentTimeMillis(); + Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { + LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName, + getMissingCompactorWarningTime()); + } + }); + + } + @Override public void run() { @@ -270,35 +293,40 @@ public class CompactionCoordinator startDeadCompactionDetector(); - // ELASTICITY_TODO the main function of the following loop was getting group summaries from - // tservers. Its no longer doing that. May be best to remove the loop and make the remaining - // task a scheduled one. - - LOG.info("Starting loop to check for compactors not checking in"); - while (!shutdown) { - long start = System.currentTimeMillis(); - - long now = System.currentTimeMillis(); - TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { - if ((now - v) > getMissingCompactorWarningTime()) { - // ELASTICITY_TODO may want to consider of the group has any jobs queued OR if the group - // still exist in configuration - LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", k, - getMissingCompactorWarningTime()); - } - }); + startIdleCompactionWatcher(); - long checkInterval = getTServerCheckInterval(); - long duration = (System.currentTimeMillis() - start); - if (checkInterval - duration > 0) { - LOG.debug("Waiting {}ms for next group check", (checkInterval - duration)); - UtilWaitThread.sleep(checkInterval - duration); - } + try { + shutdown.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for shutdown latch.", e); } LOG.info("Shutting down"); } + private Map<String,Set<HostAndPort>> getIdleCompactors() { + + Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); + ExternalCompactionUtil.getCompactorAddrs(ctx) + .forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList))); + + Set<String> emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + Set<HostAndPort> busyCompactors = allCompactors.get(rc.getGroupName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getGroupName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + protected void startDeadCompactionDetector() { deadCompactionDetector.start(); } @@ -674,7 +702,7 @@ public class CompactionCoordinator var localFates = fateInstances.get(); while (localFates == null) { UtilWaitThread.sleep(100); - if (shutdown) { + if (shutdown.getCount() == 0) { return; } localFates = fateInstances.get(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 3619215985..20f5cdc17e 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -117,10 +117,7 @@ public class CompactionCoordinatorTest { @Override protected long getTServerCheckInterval() { - // This is called from CompactionCoordinator.run(). Setting shutdown to true - // here will exit the loop in run() - this.shutdown = true; - return 0L; + return 5000L; } @Override @@ -129,6 +126,13 @@ public class CompactionCoordinatorTest { @Override protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + @Override + protected void startIdleCompactionWatcher() { + // This is called from CompactionCoordinator.run(). Counting down + // the latch will exit the run method + this.shutdown.countDown(); + } + @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, String externalCompactionId, TKeyExtent textent, TCompactionStats stats)