This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f22e774ef6 Stop tracking last compactor check-in for non-existent groups (#4403) f22e774ef6 is described below commit f22e774ef6544343ef9d467e8620161eae0f3824 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Aug 20 16:48:03 2024 -0400 Stop tracking last compactor check-in for non-existent groups (#4403) Consolidated internal cleanup methods in the Coordinator. Added logic to stop tracking compactor check-in times for groups that have been removed. --- .../coordinator/CompactionCoordinator.java | 220 +++++++++++++++------ .../coordinator/DeadCompactionDetector.java | 3 +- .../compaction/CompactionCoordinatorTest.java | 16 +- 3 files changed, 169 insertions(+), 70 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 92bc94f4ab..9b31ca33f1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -98,6 +99,8 @@ import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; @@ -107,6 +110,8 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -120,6 +125,7 @@ import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactio import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.accumulo.server.security.SecurityOperation; @@ -163,7 +169,6 @@ public class CompactionCoordinator new ConcurrentHashMap<>(); /* Map of group name to last time compactor called to get a compaction job */ - // ELASTICITY_TODO #4403 need to clean out groups that are no longer configured.. private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); private final ServerContext ctx; @@ -186,6 +191,8 @@ public class CompactionCoordinator private final LoadingCache<String,Integer> compactorCounts; private final int jobQueueInitialSize; + private volatile long coordinatorStartTime; + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, Manager manager) { this.ctx = ctx; @@ -253,44 +260,23 @@ public class CompactionCoordinator } } - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = schedExecutor + .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); - ThreadPools.watchNonCriticalScheduledTask(future); - } - - protected void startIdleCompactionWatcher() { - - ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning, - getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS); + schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } - private void idleCompactionWarning() { - - long now = System.currentTimeMillis(); - Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(); - TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> { - if ((now - lastCheckTime) > getMissingCompactorWarningTime() - && jobQueues.getQueuedJobs(groupName) > 0 - && idleCompactors.containsKey(groupName.canonical())) { - LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName, - getMissingCompactorWarningTime()); - } - }); - - } - @Override public void run() { - startCompactionCleaner(schedExecutor); - startRunningCleaner(schedExecutor); + + this.coordinatorStartTime = System.currentTimeMillis(); + startCompactorZKCleaner(schedExecutor); // On a re-start of the coordinator it's possible that external compactions are in-progress. // Attempt to get the running compactions on the compactors and then resolve which tserver @@ -312,8 +298,7 @@ public class CompactionCoordinator } startDeadCompactionDetector(); - - startIdleCompactionWatcher(); + startInternalStateCleaner(schedExecutor); try { shutdown.await(); @@ -324,13 +309,14 @@ public class CompactionCoordinator LOG.info("Shutting down"); } - private Map<String,Set<HostAndPort>> getIdleCompactors() { + private Map<String,Set<HostAndPort>> + getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) { - Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); - ExternalCompactionUtil.getCompactorAddrs(ctx) + final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); + runningCompactors .forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList))); - Set<String> emptyQueues = new HashSet<>(); + final Set<String> emptyQueues = new HashSet<>(); // Remove all of the compactors that are running a compaction RUNNING_CACHE.values().forEach(rc -> { @@ -939,30 +925,6 @@ public class CompactionCoordinator } } - /** - * The RUNNING_CACHE set may contain external compactions that are not actually running. This - * method periodically cleans those up. - */ - public void cleanUpRunning() { - - // grab a snapshot of the ids in the set before reading the metadata table. This is done to - // avoid removing things that are added while reading the metadata. - Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); - - // grab the ids that are listed as running in the metadata table. It important that this is done - // after getting the snapshot. - Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); - - var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); - - // remove ids that are in the running set but not in the metadata table - idsToRemove.forEach(this::recordCompletion); - - if (idsToRemove.size() > 0) { - LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); - } - } - /** * Return information about running compactions * @@ -1049,6 +1011,11 @@ public class CompactionCoordinator return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); } + /* Method exists to be overridden in test to hide static method */ + protected Map<String,Set<HostAndPort>> getRunningCompactors() { + return ExternalCompactionUtil.getCompactorAddrs(this.ctx); + } + /* Method exists to be overridden in test to hide static method */ protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { HostAndPort hostPort = HostAndPort.fromString(address); @@ -1065,7 +1032,7 @@ public class CompactionCoordinator } } - private void cleanUpCompactors() { + private void cleanUpEmptyCompactorPathInZK() { final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; final var zoorw = this.ctx.getZooReaderWriter(); @@ -1118,6 +1085,137 @@ public class CompactionCoordinator } } + private Set<CompactorGroupId> getCompactionServicesConfigurationGroups() + throws ReflectiveOperationException, IllegalArgumentException, SecurityException { + + Set<CompactorGroupId> groups = new HashSet<>(); + AccumuloConfiguration config = ctx.getConfiguration(); + CompactionServicesConfig servicesConfig = new CompactionServicesConfig(config); + + for (var entry : servicesConfig.getPlanners().entrySet()) { + String serviceId = entry.getKey(); + String plannerClassName = entry.getValue(); + + Class<? extends CompactionPlanner> plannerClass = + Class.forName(plannerClassName).asSubclass(CompactionPlanner.class); + CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance(); + + var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId), + servicesConfig.getPlannerPrefix(serviceId), servicesConfig.getOptions().get(serviceId), + new ServiceEnvironmentImpl(ctx)); + + planner.init(initParams); + + groups.addAll(initParams.getRequestedGroups()); + } + return groups; + } + + public void cleanUpInternalState() { + + // This method does the following: + // + // 1. Removes entries from RUNNING_CACHE that are not really running + // 2. Cancels running compactions for groups that are not in the current configuration + // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED + // 4. Log groups with no compactors + // 5. Log compactors with no groups + // 6. Log groups with compactors and queued jos that have not checked in + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + final Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + final Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + + final Set<ExternalCompactionId> idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + + // Get the set of groups being referenced in the current configuration + Set<CompactorGroupId> groupsInConfiguration = null; + try { + groupsInConfiguration = getCompactionServicesConfigurationGroups(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error( + "Error getting groups from the compaction services configuration. Unable to clean up internal state.", + e); + return; + } + + // Compaction jobs are created in the TabletGroupWatcher and added to the Coordinator + // via the addJobs method which adds the job to the CompactionJobQueues object. + final Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds(); + + final Set<CompactorGroupId> jobGroupsNotInConfiguration = + Sets.difference(groupsWithJobs, groupsInConfiguration); + + if (jobGroupsNotInConfiguration != null && !jobGroupsNotInConfiguration.isEmpty()) { + RUNNING_CACHE.values().forEach(rc -> { + if (jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) { + LOG.warn( + "External compaction {} running in group {} on compactor {}," + + " but group not found in current configuration. Failing compaction...", + rc.getJob().getExternalCompactionId(), rc.getGroupName(), rc.getCompactorAddress()); + cancelCompactionOnCompactor(rc.getCompactorAddress(), + rc.getJob().getExternalCompactionId()); + } + }); + + final Set<CompactorGroupId> trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet()); + TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration); + LOG.debug("No longer tracking compactor check-in times for groups: {}", + Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); + } + + final Map<String,Set<HostAndPort>> runningCompactors = getRunningCompactors(); + + final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>(); + runningCompactors.keySet() + .forEach(group -> runningCompactorGroups.add(CompactorGroupId.of(group))); + + final Set<CompactorGroupId> groupsWithNoCompactors = + Sets.difference(groupsInConfiguration, runningCompactorGroups); + if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) { + for (CompactorGroupId group : groupsWithNoCompactors) { + long queuedJobCount = jobQueues.getQueuedJobs(group); + if (queuedJobCount > 0) { + LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group, + queuedJobCount); + } + } + } + + final Set<CompactorGroupId> compactorsWithNoGroups = + Sets.difference(runningCompactorGroups, groupsInConfiguration); + if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) { + LOG.warn( + "The following groups have running compactors, but are not in the current configuration: {}", + compactorsWithNoGroups); + } + + final long now = System.currentTimeMillis(); + final long warningTime = getMissingCompactorWarningTime(); + Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(runningCompactors); + for (CompactorGroupId groupName : groupsInConfiguration) { + long lastCheckTime = + TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime); + if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { + LOG.warn( + "The group {} has queued jobs and {} idle compactors, however none have checked in " + + "with coordinator for {}ms", + groupName, idleCompactors.get(groupName.canonical()).size(), warningTime); + } + } + } + private static Set<StoredTabletFile> getFilesReservedBySelection(TabletMetadata tabletMetadata, SteadyTime steadyTime, ServerContext ctx) { if (tabletMetadata.getSelectedFiles() == null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index bf702b0db7..7dbf45a3d6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -230,8 +230,7 @@ public class DeadCompactionDetector { this.deadCompactions.keySet().removeAll(toFail); } - // Find and delete any known tables that have unreferenced - // compaction tmp files. + // Find and delete compaction tmp files that are unreferenced if (!tablesWithUnreferencedTmpFiles.isEmpty()) { Set<TableId> copy = new HashSet<>(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 58a592f036..90c100aa72 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -148,13 +148,10 @@ public class CompactionCoordinatorTest { } @Override - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {} @Override - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} - - @Override - protected void startIdleCompactionWatcher() { + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { // This is called from CompactionCoordinator.run(). Counting down // the latch will exit the run method this.shutdown.countDown(); @@ -196,6 +193,11 @@ public class CompactionCoordinatorTest { return runningCompactions; } + @Override + protected Map<String,Set<HostAndPort>> getRunningCompactors() { + return Map.of(); + } + @Override protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, ExternalCompactionId externalCompactionId) { @@ -434,13 +436,13 @@ public class CompactionCoordinatorTest { coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); - coordinator.cleanUpRunning(); + coordinator.cleanUpInternalState(); assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); - coordinator.cleanUpRunning(); + coordinator.cleanUpInternalState(); assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());