This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 2158e3bfb457dea9931043e35cb5d4361a7aaf8e
Merge: 26ec56a3e1 7415a252d8
Author: Christopher Tubbs <[email protected]>
AuthorDate: Fri Feb 27 16:23:18 2026 -0500

    Merge branch '2.1'

 .../accumulo/core/logging/ConditionalLogger.java   | 167 +++++++++------------
 .../spi/balancer/HostRegexTableLoadBalancer.java   |   5 +-
 .../core/logging/DeduplicatingLoggerTest.java      |   2 +-
 .../core/logging/EscalatingLoggerTest.java         |   5 +-
 .../server/compaction/CompactionJobGenerator.java  |  15 +-
 .../server/conf/CheckCompactionConfig.java         |  17 +--
 .../org/apache/accumulo/manager/EventQueue.java    |   3 -
 .../accumulo/manager/TabletGroupWatcher.java       |   8 +-
 .../coordinator/CompactionCoordinator.java         |  12 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   3 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   2 +-
 11 files changed, 105 insertions(+), 134 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index e391817452,49309dc352..5540eeec08
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@@ -66,9 -62,10 +66,8 @@@ import org.apache.commons.lang3.builder
  import org.apache.commons.lang3.builder.ToStringStyle;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
- import org.slf4j.event.Level;
  
 -import com.google.common.cache.CacheBuilder;
 -import com.google.common.cache.CacheLoader;
 -import com.google.common.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
  import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Multimap;
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index 6252d4b15d,0000000000..7e812ac872
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@@ -1,349 -1,0 +1,348 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.compaction;
 +
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.PluginEnvironment;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.TabletId;
 +import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 +import org.apache.accumulo.core.fate.FateId;
- import org.apache.accumulo.core.logging.ConditionalLogger;
++import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlan;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 +import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 +import org.apache.accumulo.core.spi.compaction.CompactionServices;
 +import org.apache.accumulo.core.util.cache.Caches;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 +import org.apache.accumulo.core.util.time.SteadyTime;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- import org.slf4j.event.Level;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +
 +public class CompactionJobGenerator {
 +  private static final Logger log = 
LoggerFactory.getLogger(CompactionJobGenerator.class);
-   private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
-       new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
-   private static final Logger PLANNING_INIT_ERROR_LOG =
-       new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
-   private static final Logger PLANNING_ERROR_LOG =
-       new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
++  private static final EscalatingLogger UNKNOWN_SERVICE_ERROR_LOG =
++      new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error);
++  private static final EscalatingLogger PLANNING_INIT_ERROR_LOG =
++      new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error);
++  private static final EscalatingLogger PLANNING_ERROR_LOG =
++      new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error);
 +
 +  private final CompactionServicesConfig servicesConfig;
 +  private final Map<CompactionServiceId,CompactionPlanner> planners = new 
HashMap<>();
 +  private final Cache<TableId,CompactionDispatcher> dispatchers;
 +  private final Set<CompactionServiceId> serviceIds;
 +  private final PluginEnvironment env;
 +  private final Map<FateId,Map<String,String>> allExecutionHints;
 +  private final SteadyTime steadyTime;
 +
 +  public CompactionJobGenerator(PluginEnvironment env,
 +      Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) {
 +    servicesConfig = new CompactionServicesConfig(env.getConfiguration());
 +    serviceIds = 
servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of)
 +        .collect(Collectors.toUnmodifiableSet());
 +
 +    dispatchers = 
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false)
 +        .maximumSize(10).build();
 +    this.env = env;
 +    if (executionHints.isEmpty()) {
 +      this.allExecutionHints = executionHints;
 +    } else {
 +      this.allExecutionHints = new HashMap<>();
 +      // Make the maps that will be passed to plugins unmodifiable. Do this 
once, so it does not
 +      // need to be done for each tablet.
 +      executionHints.forEach((k, v) -> allExecutionHints.put(k,
 +          v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
 +    }
 +
 +    this.steadyTime = steadyTime;
 +  }
 +
 +  public Collection<CompactionJob> generateJobs(TabletMetadata tablet, 
Set<CompactionKind> kinds) {
 +    Collection<CompactionJob> systemJobs = Set.of();
 +
 +    log.trace("Planning for {} {} {}", tablet.getExtent(), kinds, 
this.hashCode());
 +
 +    if (kinds.contains(CompactionKind.SYSTEM)) {
 +      CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, 
Map.of());
 +      systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, 
Map.of());
 +    }
 +
 +    Collection<CompactionJob> userJobs = Set.of();
 +
 +    if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != 
null) {
 +      var hints = 
allExecutionHints.get(tablet.getSelectedFiles().getFateId());
 +      if (hints != null) {
 +        CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, 
hints);
 +        userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, 
hints);
 +      }
 +    }
 +
 +    if (userJobs.isEmpty()) {
 +      return systemJobs;
 +    } else if (systemJobs.isEmpty()) {
 +      return userJobs;
 +    } else {
 +      var all = new ArrayList<CompactionJob>(systemJobs.size() + 
userJobs.size());
 +      all.addAll(systemJobs);
 +      all.addAll(userJobs);
 +      return all;
 +    }
 +  }
 +
 +  private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata 
tablet,
 +      Map<String,String> executionHints) {
 +
 +    CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(),
 +        tableId -> 
CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId));
 +
 +    CompactionDispatcher.DispatchParameters dispatchParams =
 +        new CompactionDispatcher.DispatchParameters() {
 +          @Override
 +          public CompactionServices getCompactionServices() {
 +            return () -> serviceIds;
 +          }
 +
 +          @Override
 +          public ServiceEnvironment getServiceEnv() {
 +            return (ServiceEnvironment) env;
 +          }
 +
 +          @Override
 +          public CompactionKind getCompactionKind() {
 +            return kind;
 +          }
 +
 +          @Override
 +          public Map<String,String> getExecutionHints() {
 +            return executionHints;
 +          }
 +        };
 +
 +    return dispatcher.dispatch(dispatchParams).getService();
 +  }
 +
 +  private Collection<CompactionJob> planCompactions(CompactionServiceId 
serviceId,
 +      CompactionKind kind, TabletMetadata tablet, Map<String,String> 
executionHints) {
 +
 +    if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
 +      UNKNOWN_SERVICE_ERROR_LOG.trace(
 +          "Table {} returned non-existent compaction service {} for 
compaction type {}.  Check"
 +              + " the table compaction dispatcher configuration. No 
compactions will happen"
 +              + " until the configuration is fixed. This log message is 
temporarily suppressed.",
 +          tablet.getExtent().tableId(), serviceId, kind);
 +      return Set.of();
 +    }
 +
 +    CompactionPlanner planner =
 +        planners.computeIfAbsent(serviceId, sid -> 
createPlanner(tablet.getTableId(), serviceId));
 +
 +    // selecting indicator
 +    // selected files
 +
 +    String ratioStr =
 +        
env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey());
 +    if (ratioStr == null) {
 +      ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue();
 +    }
 +
 +    double ratio = Double.parseDouble(ratioStr);
 +
 +    Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream()
 +        .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +        .collect(Collectors.toUnmodifiableSet());
 +    Set<CompactableFile> candidates;
 +
 +    if (kind == CompactionKind.SYSTEM) {
 +      if (tablet.getExternalCompactions().isEmpty() && 
tablet.getSelectedFiles() == null) {
 +        candidates = allFiles;
 +      } else {
 +        var tmpFiles = new HashMap<>(tablet.getFilesMap());
 +        // remove any files that are in active compactions
 +        tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +            .forEach(tmpFiles::remove);
 +        // remove any files that are selected and the user compaction has 
completed
 +        // at least 1 job, otherwise we can keep the files
 +        var selectedFiles = tablet.getSelectedFiles();
 +
 +        if (selectedFiles != null) {
 +          long selectedExpirationDuration =
 +              
ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId())
 +                  
.get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey()));
 +
 +          // If jobs are completed, or selected time has not expired, the 
remove
 +          // from the candidate list otherwise we can cancel the selection
 +          if (selectedFiles.getCompletedJobs() > 0
 +              || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis()
 +                  < selectedExpirationDuration)) {
 +            tmpFiles.keySet().removeAll(selectedFiles.getFiles());
 +          }
 +        }
 +        candidates = tmpFiles.entrySet().stream()
 +            .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +            .collect(Collectors.toUnmodifiableSet());
 +      }
 +    } else if (kind == CompactionKind.USER) {
 +      var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles());
 +      tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +          .forEach(selectedFiles::remove);
 +      candidates = selectedFiles.stream()
 +          .map(file -> new CompactableFileImpl(file, 
tablet.getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    } else {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    if (candidates.isEmpty()) {
 +      // there are not candidate files for compaction, so no reason to call 
the planner
 +      return Set.of();
 +    }
 +
 +    CompactionPlanner.PlanningParameters params = new 
CompactionPlanner.PlanningParameters() {
 +
 +      @Override
 +      public NamespaceId getNamespaceId() throws TableNotFoundException {
 +        return ((ServiceEnvironmentImpl) 
env).getContext().getNamespaceId(tablet.getTableId());
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return tablet.getTableId();
 +      }
 +
 +      @Override
 +      public TabletId getTabletId() {
 +        return new TabletIdImpl(tablet.getExtent());
 +      }
 +
 +      @Override
 +      public ServiceEnvironment getServiceEnvironment() {
 +        return (ServiceEnvironment) env;
 +      }
 +
 +      @Override
 +      public CompactionKind getKind() {
 +        return kind;
 +      }
 +
 +      @Override
 +      public double getRatio() {
 +        return ratio;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getAll() {
 +        return allFiles;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getCandidates() {
 +        return candidates;
 +      }
 +
 +      @Override
 +      public Collection<CompactionJob> getRunningCompactions() {
 +        var allFiles2 = tablet.getFilesMap();
 +        return tablet.getExternalCompactions().values().stream().map(ecMeta 
-> {
 +          Collection<CompactableFile> files = ecMeta.getJobFiles().stream()
 +              .map(f -> new CompactableFileImpl(f, 
allFiles2.get(f))).collect(Collectors.toList());
 +          CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
 +              ecMeta.getCompactionGroupId(), files, ecMeta.getKind());
 +          return job;
 +        }).collect(Collectors.toUnmodifiableList());
 +      }
 +
 +      @Override
 +      public Map<String,String> getExecutionHints() {
 +        return executionHints;
 +      }
 +
 +      @Override
 +      public CompactionPlan.Builder createPlanBuilder() {
 +        return new CompactionPlanImpl.BuilderImpl(kind, candidates);
 +      }
 +    };
 +    return planCompactions(planner, params, serviceId);
 +  }
 +
 +  private CompactionPlanner createPlanner(TableId tableId, 
CompactionServiceId serviceId) {
 +
 +    CompactionPlanner planner;
 +    String plannerClassName = null;
 +    Map<String,String> options = null;
 +    try {
 +      plannerClassName = 
servicesConfig.getPlanners().get(serviceId.canonical());
 +      options = servicesConfig.getOptions().get(serviceId.canonical());
 +      planner = env.instantiate(tableId, plannerClassName, 
CompactionPlanner.class);
 +      CompactionPlannerInitParams initParameters = new 
CompactionPlannerInitParams(serviceId,
 +          servicesConfig.getPlannerPrefix(serviceId.canonical()),
 +          servicesConfig.getOptions().get(serviceId.canonical()), 
(ServiceEnvironment) env);
 +      planner.init(initParameters);
 +    } catch (Exception e) {
 +      PLANNING_INIT_ERROR_LOG.trace(
 +          "Failed to create compaction planner for service:{} tableId:{} 
using class:{} options:{}.  Compaction "
 +              + "service will not start any new compactions until its 
configuration is fixed. This log message is "
 +              + "temporarily suppressed.",
 +          serviceId, tableId, plannerClassName, options, e);
 +      planner = new ProvisionalCompactionPlanner(serviceId);
 +    }
 +    return planner;
 +  }
 +
 +  private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
 +      CompactionPlanner.PlanningParameters params, CompactionServiceId 
serviceId) {
 +    try {
 +      return planner.makePlan(params).getJobs();
 +    } catch (Exception e) {
 +      PLANNING_ERROR_LOG.trace(
 +          "Failed to plan compactions for service:{} kind:{} tableId:{} 
hints:{}.  Compaction service may not start any"
 +              + " new compactions until this issue is resolved. Duplicates of 
this log message are temporarily"
 +              + " suppressed.",
 +          serviceId, params.getKind(), params.getTableId(), 
params.getExecutionHints(), e);
 +      return Set.of();
 +    }
 +  }
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
index 4d30b2dba4,c02b36edf7..9a0e982a8a
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
@@@ -21,20 -21,13 +21,21 @@@ package org.apache.accumulo.server.conf
  import static 
org.apache.accumulo.core.Constants.DEFAULT_COMPACTION_SERVICE_NAME;
  
  import java.io.FileNotFoundException;
 +import java.nio.file.Files;
  import java.nio.file.Path;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
  import java.util.Set;
  
 -import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.cli.ClientKeywordExecutable;
 +import org.apache.accumulo.core.cli.ClientOpts;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.data.TableId;
++import 
org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction;
  import org.apache.accumulo.core.spi.common.ServiceEnvironment;
  import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
  import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
@@@ -47,9 -37,7 +48,8 @@@ import org.apache.accumulo.start.spi.Co
  import org.apache.accumulo.start.spi.KeywordExecutable;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
- import org.slf4j.event.Level;
  
 +import com.beust.jcommander.JCommander;
  import com.beust.jcommander.Parameter;
  import com.google.auto.service.AutoService;
  
@@@ -100,18 -94,17 +100,17 @@@ public class CheckCompactionConfig exte
      }
  
      AccumuloConfiguration config = 
SiteConfiguration.fromFile(path.toFile()).build();
-     validate(config, Level.INFO);
 -    validate(config);
++    validate(config, Logger::info);
    }
  
-   public static void validate(AccumuloConfiguration config, Level level)
 -  public static void validate(AccumuloConfiguration config)
++  public static void validate(AccumuloConfiguration config, 
ConditionalLogAction logAction)
        throws ReflectiveOperationException, SecurityException, 
IllegalArgumentException {
 -    var servicesConfig = new CompactionServicesConfig(config, log::warn);
 +    var servicesConfig = new CompactionServicesConfig(config);
      ServiceEnvironment senv = createServiceEnvironment(config);
  
 -    Set<String> defaultServices = Set.of(DEFAULT, META, ROOT);
 -    if (servicesConfig.getPlanners().keySet().equals(defaultServices)) {
 -      log.warn("Only the default compaction services were created - {}", 
defaultServices);
 +    Set<String> defaultService = Set.of(DEFAULT_COMPACTION_SERVICE_NAME);
 +    if (servicesConfig.getPlanners().keySet().equals(defaultService)) {
-       log.atLevel(level).log("Only the default compaction service was created 
- {}",
-           defaultService);
++      logAction.log(log, "Only the default compaction service was created - 
{}", defaultService);
        return;
      }
  
@@@ -119,7 -111,8 +118,7 @@@
      for (var entry : servicesConfig.getPlanners().entrySet()) {
        String serviceId = entry.getKey();
        String plannerClassName = entry.getValue();
-       log.atLevel(level).log("Service id: {}, planner class:{}", serviceId, 
plannerClassName);
 -
 -      log.info("Service id: {}, planner class:{}", serviceId, 
plannerClassName);
++      logAction.log(log, "Service id: {}, planner class:{}", serviceId, 
plannerClassName);
  
        Class<? extends CompactionPlanner> plannerClass =
            Class.forName(plannerClassName).asSubclass(CompactionPlanner.class);
@@@ -131,29 -123,19 +130,29 @@@
  
        planner.init(initParams);
  
 -      initParams.getRequestedExecutors()
 -          .forEach((execId, numThreads) -> log.info(
 -              "Compaction service '{}' requested creation of thread pool '{}' 
with {} threads.",
 -              serviceId, execId, numThreads));
 +      initParams.getRequestedGroups().forEach(groupId -> {
-         log.atLevel(level).log("Compaction service '{}' requested with 
compactor group '{}'",
-             serviceId, groupId);
++        logAction.log(log, "Compaction service '{}' requested with compactor 
group '{}'", serviceId,
++            groupId);
 +        groupToServices.computeIfAbsent(groupId, f -> new 
HashSet<>()).add(serviceId);
 +      });
 +    }
  
 -      initParams.getRequestedExternalExecutors()
 -          .forEach(execId -> log.info(
 -              "Compaction service '{}' requested with external execution 
queue '{}'", serviceId,
 -              execId));
 +    boolean dupesFound = false;
 +    for (Entry<ResourceGroupId,Set<String>> e : groupToServices.entrySet()) {
 +      if (e.getValue().size() > 1) {
 +        log.warn("Compaction services " + e.getValue().toString()
 +            + " mapped to the same compactor group: " + e.getKey());
 +        dupesFound = true;
 +      }
 +    }
  
 +    if (dupesFound) {
 +      throw new IllegalStateException(
 +          "Multiple compaction services configured to use the same group. 
This could lead"
 +              + " to undesired behavior. Please fix the configuration");
      }
  
-     log.atLevel(level).log("Properties file has passed all checks.");
 -    log.info("Properties file has passed all checks.");
++    logAction.log(log, "Properties file has passed all checks.");
  
    }
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java
index c9698191e5,0000000000..53d22235c8
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java
@@@ -1,157 -1,0 +1,154 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.util.CountDownTimer;
 +import org.apache.accumulo.manager.EventCoordinator.Event;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * Event queue that collapses events when possible.
 + */
 +public class EventQueue {
 +
-   private static final Logger log = LoggerFactory.getLogger(EventQueue.class);
 +  private boolean allLevels = false;
 +
 +  private static class Table {
 +    final TableId tableId;
 +    boolean allExtents = false;
 +    Map<KeyExtent,Event> extents = new HashMap<>();
 +
 +    private Table(TableId tableId) {
 +      this.tableId = tableId;
 +    }
 +
 +    public void add(Event event) {
 +      if (allExtents) {
 +        return;
 +      }
 +
 +      if (event.getScope() == EventCoordinator.EventScope.TABLE) {
 +        allExtents = true;
 +        extents.clear();
 +      } else {
 +        Preconditions.checkArgument(event.getScope() == 
EventCoordinator.EventScope.TABLE_RANGE);
 +        extents.put(event.getExtent(), event);
 +        if (extents.size() > 10_000) {
 +          allExtents = true;
 +          extents.clear();
 +        }
 +      }
 +    }
 +
 +    public void fill(List<Event> events) {
 +      if (allExtents) {
 +        events.add(new Event(tableId));
 +      } else {
 +        events.addAll(extents.values());
 +      }
 +    }
 +  }
 +
 +  private static class Level {
 +    final Ample.DataLevel dataLevel;
 +    boolean allTables = false;
 +    Map<TableId,Table> tables = new HashMap<>();
 +
 +    private Level(Ample.DataLevel dataLevel) {
 +      this.dataLevel = dataLevel;
 +    }
 +
 +    void add(Event event) {
 +      if (allTables) {
 +        return;
 +      }
 +
 +      if (event.getScope() == EventCoordinator.EventScope.DATA_LEVEL) {
 +        allTables = true;
 +        tables.clear();
 +      } else {
 +        var table = tables.computeIfAbsent(event.getTableId(), Table::new);
 +        table.add(event);
 +      }
 +    }
 +
 +    public void fill(List<Event> events) {
 +      if (allTables) {
 +        events.add(new Event(dataLevel));
 +      } else {
 +        tables.values().forEach(table -> table.fill(events));
 +      }
 +    }
 +  }
 +
 +  private HashMap<Ample.DataLevel,Level> levels = new HashMap<>();
 +
 +  public synchronized void add(Event event) {
 +    if (allLevels) {
 +      return;
 +    }
 +
 +    if (event.getScope() == EventCoordinator.EventScope.ALL) {
 +      allLevels = true;
 +      levels.clear();
 +    } else {
 +      var level = levels.computeIfAbsent(event.getLevel(), Level::new);
 +      level.add(event);
 +    }
 +    notify();
 +  }
 +
 +  private static final List<Event> ALL_LEVELS = List.of(new Event());
 +
 +  public synchronized List<Event> poll(long duration, TimeUnit timeUnit)
 +      throws InterruptedException {
 +    CountDownTimer timer = CountDownTimer.startNew(duration, timeUnit);
 +    while (!allLevels && levels.isEmpty() && !timer.isExpired()) {
 +      wait(Math.max(1, timer.timeLeft(TimeUnit.MILLISECONDS)));
 +    }
 +
 +    List<Event> events;
 +    if (allLevels) {
 +      events = ALL_LEVELS;
 +    } else {
 +      events = new ArrayList<>();
 +      levels.values().forEach(l -> l.fill(events));
 +    }
 +
 +    // reset back to empty
 +    allLevels = false;
 +    levels.clear();
 +
 +    return events;
 +  }
 +
 +  public List<Event> take() throws InterruptedException {
 +    return poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 5fd56b3c2c,45f55169a6..5c6a393126
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -94,17 -97,15 +94,16 @@@ import org.apache.accumulo.server.manag
  import org.apache.accumulo.server.manager.state.Assignment;
  import org.apache.accumulo.server.manager.state.ClosableIterator;
  import org.apache.accumulo.server.manager.state.DistributedStoreException;
 -import org.apache.accumulo.server.manager.state.MergeInfo;
 -import org.apache.accumulo.server.manager.state.MergeState;
 +import org.apache.accumulo.server.manager.state.TabletGoalState;
 +import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 +import org.apache.accumulo.server.manager.state.TabletManagementParameters;
  import org.apache.accumulo.server.manager.state.TabletStateStore;
  import org.apache.accumulo.server.manager.state.UnassignedTablet;
 -import org.apache.accumulo.server.tablets.TabletTime;
  import org.apache.hadoop.fs.Path;
 -import org.apache.hadoop.io.Text;
  import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
  import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- import org.slf4j.event.Level;
  
  import com.google.common.base.Preconditions;
  import com.google.common.collect.ImmutableSortedSet;
@@@ -113,11 -114,8 +112,10 @@@ import com.google.common.net.HostAndPor
  
  abstract class TabletGroupWatcher extends AccumuloDaemonThread {
  
 +  private static final Logger LOG = 
LoggerFactory.getLogger(TabletGroupWatcher.class);
 +
-   private static final Logger TABLET_UNLOAD_LOGGER =
-       new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, 
Level.INFO);
- 
+   private static final EscalatingLogger TABLET_UNLOAD_LOGGER =
+       new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, 
Logger::info);
    private final Manager manager;
    private final TabletStateStore store;
    private final TabletGroupWatcher dependentWatcher;
@@@ -225,565 -185,208 +223,565 @@@
      }
    }
  
 -  @Override
 -  public void run() {
 -    int[] oldCounts = new int[TabletState.values().length];
 -    EventCoordinator.Listener eventListener = 
this.manager.nextEvent.getListener();
 +  class EventHandler implements EventCoordinator.Listener {
  
 -    WalStateManager wals = new WalStateManager(manager.getContext());
 +    // Setting this to true to start with because its not know what happended 
before this object was
 +    // created, so just start off with full scan.
 +    private boolean needsFullScan = true;
  
 -    while (manager.stillManager()) {
 -      // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
 -      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    private final EventQueue eventQueue;
  
 -      final long waitTimeBetweenScans = manager.getConfiguration()
 -          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 +    class RangeProcessor implements Runnable {
 +      @Override
 +      public void run() {
 +        try {
 +          while (manager.stillManager()) {
 +            var events = eventQueue.poll(100, TimeUnit.MILLISECONDS);
  
 -      int totalUnloaded = 0;
 -      int unloaded = 0;
 -      ClosableIterator<TabletLocationState> iter = null;
 -      try {
 -        Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
 -        Map<TableId,MergeStats> currentMerges = new HashMap<>();
 -        for (MergeInfo merge : manager.merges()) {
 -          if (merge.getExtent() != null) {
 -            currentMerges.put(merge.getExtent().tableId(), new 
MergeStats(merge));
 +            if (events.isEmpty()) {
 +              // check to see if still the manager
 +              continue;
 +            }
 +
 +            EnumSet<EventScope> scopesSeen = EnumSet.noneOf(EventScope.class);
 +            List<Range> ranges = new ArrayList<>(events.size());
 +            for (var event : events) {
 +              scopesSeen.add(event.getScope());
 +              if (event.getScope() == EventScope.TABLE
 +                  || event.getScope() == EventScope.TABLE_RANGE) {
 +                ranges.add(event.getExtent().toMetaRange());
 +              }
 +            }
 +
 +            if (scopesSeen.contains(EventScope.ALL) || 
scopesSeen.contains(EventScope.DATA_LEVEL)) {
 +              // Since this code should only receive events for a single data 
level, and seeing a
 +              // data level should squish all table and tablet events, then 
seeing ranges indicates
 +              // assumptions this code is making are incorrect or there is a 
bug somewhere.
 +              Preconditions.checkState(ranges.isEmpty());
 +              setNeedsFullScan();
 +            } else {
 +              if (!processRanges(ranges)) {
 +                setNeedsFullScan();
 +              }
 +            }
            }
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
          }
 +      }
 +    }
 +
 +    EventHandler() {
 +      eventQueue = new EventQueue();
 +      Threads.createCriticalThread("TGW [" + store.name() + "] event range 
processor",
 +          new RangeProcessor()).start();
 +    }
 +
 +    private synchronized void setNeedsFullScan() {
 +      needsFullScan = true;
 +      notifyAll();
 +    }
  
 -        // Get the current status for the current list of tservers
 -        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
 -        for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
 -          currentTServers.put(entry, manager.tserverStatus.get(entry));
 +    public synchronized void clearNeedsFullScan() {
 +      needsFullScan = false;
 +    }
 +
 +    public synchronized boolean isNeedsFullScan() {
 +      return needsFullScan;
 +    }
 +
 +    @Override
 +    public void process(Event event) {
 +      eventQueue.add(event);
 +    }
 +
 +    synchronized void waitForFullScan(long millis) {
 +      if (!needsFullScan) {
 +        try {
 +          wait(millis);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
          }
 +      }
 +    }
 +  }
  
 -        if (currentTServers.isEmpty()) {
 -          eventListener.waitForEvents(waitTimeBetweenScans);
 -          synchronized (this) {
 -            lastScanServers = Collections.emptySortedSet();
 +  private boolean processRanges(List<Range> ranges) {
 +    if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
 +      return false;
 +    }
 +
 +    TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters(false);
 +
 +    var currentTservers = 
getCurrentTservers(tabletMgmtParams.getOnlineTsevers());
 +    if (currentTservers.isEmpty()) {
 +      return false;
 +    }
 +
 +    try (var iter = store.iterator(ranges, tabletMgmtParams)) {
 +      long t1 = System.currentTimeMillis();
 +      manageTablets(iter, tabletMgmtParams, currentTservers, false);
 +      long t2 = System.currentTimeMillis();
 +      Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds 
for %,d ranges",
 +          store.name(), (t2 - t1) / 1000., ranges.size()));
 +    } catch (Exception e) {
 +      Manager.log.error("Error processing {} ranges for store {} ", 
ranges.size(), store.name(), e);
 +    }
 +
 +    return true;
 +  }
 +
 +  private final Set<KeyExtent> hostingRequestInProgress = new 
ConcurrentSkipListSet<>();
 +
 +  public void hostOndemand(Collection<KeyExtent> extents) {
 +    // This is only expected to be called for the user level
 +    Preconditions.checkState(getLevel() == Ample.DataLevel.USER);
 +
 +    final List<KeyExtent> inProgress = new ArrayList<>();
 +    extents.forEach(ke -> {
 +      if (hostingRequestInProgress.add(ke)) {
 +        LOG.info("Tablet hosting requested for: {} ", ke);
 +        inProgress.add(ke);
 +      } else {
 +        LOG.trace("Ignoring hosting request because another thread is 
currently processing it {}",
 +            ke);
 +      }
 +    });
 +    // Do not add any code here, it may interfere with the finally block 
removing extents from
 +    // hostingRequestInProgress
 +    try (var mutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
 +      inProgress.forEach(ke -> 
mutator.mutateTablet(ke).requireAbsentOperation()
 +          
.requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation()
 +          .setHostingRequested()
 +          .submit(TabletMetadata::getHostingRequested, () -> "host 
ondemand"));
 +
 +      List<Range> ranges = new ArrayList<>();
 +
 +      mutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          // cache this success for a bit
 +          ranges.add(extent.toMetaRange());
 +        } else {
 +          if (LOG.isTraceEnabled()) {
 +            // only read the metadata if the logging is enabled
 +            LOG.trace("Failed to set hosting request {}", 
result.readMetadata());
            }
 -          continue;
          }
 +      });
 +
 +      if (!ranges.isEmpty()) {
 +        processRanges(ranges);
 +      }
 +    } finally {
 +      inProgress.forEach(hostingRequestInProgress::remove);
 +    }
 +  }
  
 -        TabletLists tLists = new TabletLists(manager, currentTServers);
 +  private TabletManagementParameters
 +      createTabletManagementParameters(boolean 
lookForTabletsNeedingVolReplacement) {
  
 -        RecoveryManager.RecoverySession recoverySession =
 -            manager.recoveryManager.newRecoverySession();
 +    HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>();
 +    UpgradeCoordinator.UpgradeStatus upgradeStatus = 
manager.getUpgradeStatus();
 +    for (var level : Ample.DataLevel.values()) {
 +      parentLevelUpgrade.put(level, 
upgradeStatus.isParentLevelUpgraded(level));
 +    }
  
 -        ManagerState managerState = manager.getManagerState();
 -        int[] counts = new int[TabletState.values().length];
 -        stats.begin();
 -        // Walk through the tablets in our store, and work tablets
 -        // towards their goal
 -        iter = store.iterator();
 -        while (iter.hasNext()) {
 -          TabletLocationState tls = iter.next();
 -          if (tls == null) {
 -            continue;
 -          }
 +    Set<TServerInstance> shutdownServers;
 +    if (store.getLevel() == Ample.DataLevel.USER) {
 +      shutdownServers = manager.shutdownServers();
 +    } else {
 +      // Use the servers to shutdown filtered by the dependent watcher. These 
are servers to
 +      // shutdown that the dependent watcher has determined it has no tablets 
hosted on or assigned
 +      // to.
 +      shutdownServers = dependentWatcher.getFilteredServersToShutdown();
 +    }
  
 -          // ignore entries for tables that do not exist in zookeeper
 -          if (manager.getTableManager().getTableState(tls.extent.tableId()) 
== null) {
 -            continue;
 -          }
 +    var tServersSnapshot = manager.tserversSnapshot();
 +
 +    var tabletMgmtParams = new 
TabletManagementParameters(manager.getManagerState(),
 +        parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, 
shutdownServers,
 +        store.getLevel(), manager.getCompactionHints(store.getLevel()), 
canSuspendTablets(),
 +        lookForTabletsNeedingVolReplacement ? 
manager.getContext().getVolumeReplacements()
 +            : Map.of(),
 +        manager.getSteadyTime());
 +
 +    if (LOG.isTraceEnabled()) {
 +      // Log the json that will be passed to iterators to make tablet 
filtering decisions.
 +      LOG.trace("{}:{}", TabletManagementParameters.class.getSimpleName(),
 +          tabletMgmtParams.serialize());
 +    }
 +
 +    return tabletMgmtParams;
 +  }
 +
 +  private Set<TServerInstance> getFilteredServersToShutdown() {
 +    return filteredServersToShutdown;
 +  }
 +
 +  private static class TableMgmtStats {
 +    final int[] counts = new int[TabletState.values().length];
 +    private int totalUnloaded;
 +    private long totalVolumeReplacements;
 +    private int tabletsWithErrors;
 +  }
 +
 +  private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
 +      TabletManagementParameters tableMgmtParams,
 +      SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean 
isFullScan)
 +      throws TException, DistributedStoreException, WalMarkerException, 
IOException {
 +
 +    // When upgrading the Manager needs the TabletGroupWatcher
 +    // to assign and balance the root and metadata tables, but
 +    // the Manager does not fully start up until the upgrade
 +    // is complete. This means that objects like the Splitter
 +    // are not going to be initialized and the Coordinator
 +    // is not going to be started.
 +    final boolean currentlyUpgrading = manager.isUpgrading();
 +    if (currentlyUpgrading) {
 +      LOG.debug(
 +          "Currently upgrading, splits and compactions for tables in level {} 
will occur once upgrade is completed.",
 +          store.getLevel());
 +    }
  
 -          // Don't overwhelm the tablet servers with work
 -          if (tLists.unassigned.size() + unloaded
 -              > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
 -            flushChanges(tLists, wals);
 -            tLists.reset();
 -            unloaded = 0;
 -            eventListener.waitForEvents(waitTimeBetweenScans);
 +    final TableMgmtStats tableMgmtStats = new TableMgmtStats();
 +    final boolean shuttingDownAllTabletServers =
 +        
tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet());
 +    if (shuttingDownAllTabletServers && !isFullScan) {
 +      // If we are shutting down all of the TabletServers, then don't process 
any events
 +      // from the EventCoordinator.
 +      LOG.debug("Partial scan requested, but aborted due to shutdown of all 
TabletServers");
 +      return tableMgmtStats;
 +    }
 +
 +    int unloaded = 0;
 +
 +    TabletLists tLists = new TabletLists(currentTServers, 
tableMgmtParams.getGroupedTServers(),
 +        tableMgmtParams.getServersToShutdown());
 +
 +    CompactionJobGenerator compactionGenerator =
 +        new CompactionJobGenerator(new 
ServiceEnvironmentImpl(manager.getContext()),
 +            tableMgmtParams.getCompactionHints(), 
tableMgmtParams.getSteadyTime());
 +
 +    try {
-       CheckCompactionConfig.validate(manager.getConfiguration(), Level.TRACE);
++      CheckCompactionConfig.validate(manager.getConfiguration(), 
Logger::trace);
 +      this.metrics.clearCompactionServiceConfigurationError();
 +    } catch (RuntimeException | ReflectiveOperationException e) {
 +      this.metrics.setCompactionServiceConfigurationError();
 +      LOG.error(
 +          "Error validating compaction configuration, all {} compactions are 
paused until the configuration is fixed.",
 +          store.getLevel(), e);
 +      compactionGenerator = null;
 +    }
 +
 +    Set<TServerInstance> filteredServersToShutdown =
 +        new HashSet<>(tableMgmtParams.getServersToShutdown());
 +
 +    while (iter.hasNext() && !manager.isShutdownRequested()) {
 +      final TabletManagement mti = iter.next();
 +      if (mti == null) {
 +        throw new IllegalStateException("State store returned a null 
ManagerTabletInfo object");
 +      }
 +
 +      final String mtiError = mti.getErrorMessage();
 +      if (mtiError != null) {
 +        LOG.warn(
 +            "Error on TabletServer trying to get Tablet management 
information for metadata tablet. Error message: {}",
 +            mtiError);
 +        this.metrics.incrementTabletGroupWatcherError(this.store.getLevel());
 +        tableMgmtStats.tabletsWithErrors++;
 +        continue;
 +      }
 +
 +      RecoveryManager.RecoverySession recoverySession =
 +          manager.recoveryManager.newRecoverySession();
 +
 +      final TabletMetadata tm = mti.getTabletMetadata();
 +      final TableId tableId = tm.getTableId();
 +      // ignore entries for tables that do not exist in zookeeper
 +      if (manager.getTableManager().getTableState(tableId) == 
TableState.UNKNOWN) {
 +        continue;
 +      }
 +
 +      // Don't overwhelm the tablet servers with work
 +      if (tLists.unassigned.size() + unloaded
 +          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()
 +          || tLists.volumeReplacements.size() > 1000) {
 +        flushChanges(tLists);
 +        tLists.reset();
 +        unloaded = 0;
 +      }
 +
 +      final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 +
 +      TabletState state = TabletState.compute(tm, currentTServers.keySet());
 +      if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) {
 +        /*
 +         * This code exists to deal with a race condition caused by two 
threads running in this
 +         * class that compute tablets actions. One thread does full scans and 
the other reacts to
 +         * events and does partial scans. Below is an example of the race 
condition this is
 +         * handling.
 +         *
 +         * - TGW Thread 1 : reads the set of tablets servers and its empty
 +         *
 +         * - TGW Thread 2 : reads the set of tablet servers and its [TS1]
 +         *
 +         * - TGW Thread 2 : Sees tabletX without a location and assigns it to 
TS1
 +         *
 +         * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's 
assigned to a dead tablet
 +         * server because its set of live servers is the empty set.
 +         *
 +         * To deal with this race condition, this code recomputes the tablet 
state using the latest
 +         * tservers when a tablet is seen assigned to a dead tserver.
 +         */
 +
 +        TabletState newState = TabletState.compute(tm, 
manager.tserversSnapshot().getTservers());
 +        if (newState != state) {
 +          LOG.debug("Tablet state changed when using latest set of tservers 
{} {} {}",
 +              tm.getExtent(), state, newState);
 +          state = newState;
 +        }
 +      }
 +      tableMgmtStats.counts[state.ordinal()]++;
 +
 +      // This is final because nothing in this method should change the goal. 
All computation of the
 +      // goal should be done in TabletGoalState.compute() so that all parts 
of the Accumulo code
 +      // will compute a consistent goal.
 +      final TabletGoalState goal = TabletGoalState.compute(tm, state,
 +          manager.getBalanceManager().getBalancer(), tableMgmtParams);
 +
 +      final Set<ManagementAction> actions = mti.getActions();
 +
 +      if (actions.contains(ManagementAction.NEEDS_RECOVERY) && goal != 
TabletGoalState.HOSTED) {
 +        LOG.warn("Tablet has wals, but goal is not hosted. Tablet: {}, 
goal:{}", tm.getExtent(),
 +            goal);
 +      }
 +
 +      if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
 +        tableMgmtStats.totalVolumeReplacements++;
 +        if (state == TabletState.UNASSIGNED || state == 
TabletState.SUSPENDED) {
 +          var volRep =
 +              
VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), 
tm);
 +          if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) {
 +            if (tm.getLocation() != null) {
 +              // since the totalVolumeReplacements counter was incremented, 
should try this again
 +              // later after its unassigned
 +              LOG.debug("Volume replacement needed for {} but it has a 
location {}.",
 +                  tm.getExtent(), tm.getLocation());
 +            } else if (tm.getOperationId() != null) {
 +              LOG.debug("Volume replacement needed for {} but it has an 
active operation {}.",
 +                  tm.getExtent(), tm.getOperationId());
 +            } else {
 +              LOG.debug("Volume replacement needed for {}.", tm.getExtent());
 +              // buffer replacements so that multiple mutations can be done 
at once
 +              tLists.volumeReplacements.add(volRep);
 +            }
 +          } else {
 +            LOG.debug("Volume replacement evaluation for {} returned no 
changes.", tm.getExtent());
            }
 -          TableId tableId = tls.extent.tableId();
 -          TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 -
 -          MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k 
-> {
 -            var mStats = currentMerges.get(k);
 -            return mStats != null ? mStats : new MergeStats(new MergeInfo());
 -          });
 -          TabletGoalState goal = manager.getGoalState(tls, 
mergeStats.getMergeInfo());
 -          Location location = tls.getLocation();
 -          TabletState state = tls.getState(currentTServers.keySet());
 -
 -          TabletLogger.missassigned(tls.extent, goal.toString(), 
state.toString(),
 -              tls.getFutureServer(), tls.getCurrentServer(), 
tls.walogs.size());
 -
 -          stats.update(tableId, state);
 -          mergeStats.update(tls.extent, state, tls.chopped, 
!tls.walogs.isEmpty());
 -          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
 -          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
 -
 -          // Always follow through with assignments
 -          if (state == TabletState.ASSIGNED) {
 -            goal = TabletGoalState.HOSTED;
 +        } else {
 +          LOG.debug("Volume replacement needed for {} but its tablet state is 
{}.", tm.getExtent(),
 +              state);
 +        }
 +      }
 +
 +      if (actions.contains(ManagementAction.BAD_STATE) && 
tm.isFutureAndCurrentLocationSet()) {
 +        Manager.log.error("{}, saw tablet with multiple locations, which 
should not happen",
 +            tm.getExtent());
 +        logIncorrectTabletLocations(tm);
 +        // take no further action for this tablet
 +        continue;
 +      }
 +
 +      final Location location = tm.getLocation();
 +      Location current = null;
 +      Location future = null;
 +      if (tm.hasCurrent()) {
 +        current = tm.getLocation();
 +      } else {
 +        future = tm.getLocation();
 +      }
 +      TabletLogger.missassigned(tm.getExtent(), goal.toString(), 
state.toString(),
 +          future != null ? future.getServerInstance() : null,
 +          current != null ? current.getServerInstance() : null, 
tm.getLogs().size());
 +
 +      if (isFullScan) {
 +        stats.update(tableId, state);
 +      }
 +
 +      if (Manager.log.isTraceEnabled()) {
 +        Manager.log.trace(
 +            "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: 
{}, state: {}, goal: {} actions:{} #wals:{}",
 +            store.name(), 
tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()),
 +            dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tm.getExtent(),
 +            state, goal, actions, tm.getLogs().size());
 +      }
 +
 +      final boolean needsSplit = 
actions.contains(ManagementAction.NEEDS_SPLITTING);
 +      if (!currentlyUpgrading && needsSplit) {
 +        LOG.debug("{} may need splitting.", tm.getExtent());
 +        manager.getSplitter().initiateSplit(tm.getExtent());
 +      }
 +
 +      if (!currentlyUpgrading && 
actions.contains(ManagementAction.NEEDS_COMPACTING)
 +          && compactionGenerator != null) {
 +        // Check if tablet needs splitting, priority should be giving to 
splits over
 +        // compactions because it's best to compact after a split
 +        if (!needsSplit) {
 +          var jobs = compactionGenerator.generateJobs(tm,
 +              TabletManagementIterator.determineCompactionKinds(actions));
 +          LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());
 +          manager.getCompactionCoordinator().addJobs(tm, jobs);
 +        } else {
 +          LOG.trace("skipping compaction job generation because {} may need 
splitting.",
 +              tm.getExtent());
 +        }
 +      }
 +
 +      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
 +          || actions.contains(ManagementAction.NEEDS_RECOVERY)) {
 +
 +        if (tm.getLocation() != null) {
 +          
filteredServersToShutdown.remove(tm.getLocation().getServerInstance());
 +        }
 +
 +        if (goal == TabletGoalState.HOSTED) {
 +
 +          // RecoveryManager.recoverLogs will return false when all of the 
logs
 +          // have been sorted so that recovery can occur. Delay the hosting of
 +          // the Tablet until the sorting is finished.
 +          if ((state != TabletState.HOSTED && 
actions.contains(ManagementAction.NEEDS_RECOVERY))
 +              && recoverySession.recoverLogs(tm.getLogs())) {
 +            LOG.debug("Not hosting {} as it needs recovery, logs: {}", 
tm.getExtent(),
 +                tm.getLogs().size());
 +            continue;
            }
 -          if (Manager.log.isTraceEnabled()) {
 -            Manager.log.trace(
 -                "[{}] Shutting down all Tservers: {}, dependentCount: {} 
Extent: {}, state: {}, goal: {}",
 -                store.name(), 
manager.serversToShutdown.equals(currentTServers.keySet()),
 -                dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tls.extent,
 -                state, goal);
 +          switch (state) {
 +            case ASSIGNED_TO_DEAD_SERVER:
 +              hostDeadTablet(tLists, tm, location);
 +              break;
 +            case SUSPENDED:
 +              hostSuspendedTablet(tLists, tm, location, tableConf);
 +              break;
 +            case UNASSIGNED:
 +              hostUnassignedTablet(tLists, tm.getExtent(),
 +                  new UnassignedTablet(location, tm.getLast()));
 +              break;
 +            case ASSIGNED:
 +              // Send another reminder
 +              tLists.assigned.add(new Assignment(tm.getExtent(),
 +                  future != null ? future.getServerInstance() : null, 
tm.getLast()));
 +              break;
 +            case HOSTED:
 +              break;
            }
 -
 -          // if we are shutting down all the tabletservers, we have to do it 
in order
 -          if ((goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED)
 -              && manager.serversToShutdown.equals(currentTServers.keySet())) {
 -            if (dependentWatcher != null) {
 -              // If the dependentWatcher is for the user tables, check to see
 -              // that user tables exist.
 -              DataLevel dependentLevel = dependentWatcher.store.getLevel();
 -              boolean userTablesExist = true;
 -              switch (dependentLevel) {
 -                case USER:
 -                  Set<TableId> onlineTables = manager.onlineTables();
 -                  onlineTables.remove(RootTable.ID);
 -                  onlineTables.remove(MetadataTable.ID);
 -                  userTablesExist = !onlineTables.isEmpty();
 -                  break;
 -                case METADATA:
 -                case ROOT:
 -                default:
 -                  break;
 -              }
 -              // If the stats object in the dependentWatcher is empty, then it
 -              // currently does not have data about what is hosted or not. In
 -              // that case host these tablets until the dependent watcher can
 -              // gather some data.
 -              final Map<TableId,TableCounts> stats = 
dependentWatcher.getStats();
 -              if (dependentLevel == DataLevel.USER) {
 -                if (userTablesExist
 -                    && (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0)) {
 -                  goal = TabletGoalState.HOSTED;
 -                }
 -              } else if (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0) {
 -                goal = TabletGoalState.HOSTED;
 +        } else {
 +          switch (state) {
 +            case SUSPENDED:
 +              // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
 +              tLists.suspendedToGoneServers.add(tm);
 +              break;
 +            case ASSIGNED_TO_DEAD_SERVER:
 +              unassignDeadTablet(tLists, tm);
 +              break;
 +            case HOSTED:
 +              TServerConnection client =
 +                  
manager.tserverSet.getConnection(location.getServerInstance());
 +              if (client != null) {
 +                TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} 
unload {} {}",
 +                    store.name(), location.getServerInstance(), 
tm.getExtent(), goal.howUnload());
 +                client.unloadTablet(manager.managerLock, tm.getExtent(), 
goal.howUnload(),
 +                    manager.getSteadyTime().getMillis());
 +                tableMgmtStats.totalUnloaded++;
 +                unloaded++;
 +              } else {
 +                Manager.log.warn("Could not connect to server {}", location);
                }
 -            }
 +              break;
 +            case ASSIGNED:
 +            case UNASSIGNED:
 +              break;
            }
 +        }
 +      }
 +    }
  
 -          if (goal == TabletGoalState.HOSTED) {
 -            if ((state != TabletState.HOSTED && !tls.walogs.isEmpty())
 -                && recoverySession.recoverLogs(tls.walogs)) {
 -              continue;
 -            }
 -            switch (state) {
 -              case HOSTED:
 -                if 
(location.getServerInstance().equals(manager.migrations.get(tls.extent))) {
 -                  manager.migrations.removeExtent(tls.extent);
 -                }
 -                break;
 -              case ASSIGNED_TO_DEAD_SERVER:
 -                hostDeadTablet(tLists, tls, location, wals);
 -                break;
 -              case SUSPENDED:
 -                hostSuspendedTablet(tLists, tls, location, tableConf);
 -                break;
 -              case UNASSIGNED:
 -                hostUnassignedTablet(tLists, tls.extent, new 
UnassignedTablet(location, tls.last));
 -                break;
 -              case ASSIGNED:
 -                // Send another reminder
 -                tLists.assigned.add(new Assignment(tls.extent, 
tls.getFutureServer(), tls.last));
 -                break;
 -            }
 -          } else {
 -            switch (state) {
 -              case SUSPENDED:
 -                // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
 -                tLists.suspendedToGoneServers.add(tls);
 -                cancelOfflineTableMigrations(tls.extent);
 -                break;
 -              case UNASSIGNED:
 -                cancelOfflineTableMigrations(tls.extent);
 -                break;
 -              case ASSIGNED_TO_DEAD_SERVER:
 -                unassignDeadTablet(tLists, tls, wals);
 -                break;
 -              case HOSTED:
 -                TServerConnection client =
 -                    
manager.tserverSet.getConnection(location.getServerInstance());
 -                if (client != null) {
 -                  try {
 -                    TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer 
{} unload {} {}",
 -                        store.name(), location.getServerInstance(), 
tls.extent, goal.howUnload());
 -                    client.unloadTablet(manager.managerLock, tls.extent, 
goal.howUnload(),
 -                        manager.getSteadyTime());
 -                    unloaded++;
 -                    totalUnloaded++;
 -                  } catch (TException tException) {
 -                    Manager.log.warn("[{}] Failed to request tablet unload {} 
{} {}", store.name(),
 -                        location.getServerInstance(), tls.extent, 
goal.howUnload(), tException);
 -                  }
 -                } else {
 -                  Manager.log.warn("Could not connect to server {}", 
location);
 -                }
 -                break;
 -              case ASSIGNED:
 -                break;
 -            }
 +    flushChanges(tLists);
 +
 +    if (isFullScan) {
 +      this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown);
 +    }
 +
 +    return tableMgmtStats;
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus>
 +      getCurrentTservers(Set<TServerInstance> onlineTservers) {
 +    // Get the current status for the current list of tservers
 +    final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
 +    for (TServerInstance entry : onlineTservers) {
 +      currentTServers.put(entry, 
manager.getTserverStatus().status.get(entry));
 +    }
 +    return currentTServers;
 +  }
 +
 +  @Override
 +  public void run() {
 +    int[] oldCounts = new int[TabletState.values().length];
 +    boolean lookForTabletsNeedingVolReplacement = true;
 +
 +    while (manager.stillManager() && !manager.isShutdownRequested()) {
 +      if (!eventHandler.isNeedsFullScan()) {
 +        // If an event handled by the EventHandler.RangeProcessor indicated
 +        // that we need to do a full scan, then do it. Otherwise wait a bit
 +        // before re-checking the tablets.
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      }
 +
 +      final long waitTimeBetweenScans = manager.getConfiguration()
 +          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 +
 +      TabletManagementParameters tableMgmtParams =
 +          
createTabletManagementParameters(lookForTabletsNeedingVolReplacement);
 +      var currentTServers = 
getCurrentTservers(tableMgmtParams.getOnlineTsevers());
 +
 +      ClosableIterator<TabletManagement> iter = null;
 +      try {
 +        if (currentTServers.isEmpty()) {
 +          eventHandler.waitForFullScan(waitTimeBetweenScans);
 +          synchronized (this) {
 +            lastScanServers = Collections.emptySortedSet();
            }
 -          counts[state.ordinal()]++;
 +          continue;
          }
  
 -        flushChanges(tLists, wals);
 +        stats.begin();
 +
 +        ManagerState managerState = tableMgmtParams.getManagerState();
 +
 +        // Clear the need for a full scan before starting a full scan inorder 
to detect events that
 +        // happen during the full scan.
 +        eventHandler.clearNeedsFullScan();
 +
 +        iter = store.iterator(tableMgmtParams);
 +        
manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel());
 +        var tabletMgmtStats = manageTablets(iter, tableMgmtParams, 
currentTServers, true);
 +        
manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel());
 +
 +        // If currently looking for volume replacements, determine if the 
next round needs to look.
 +        if (lookForTabletsNeedingVolReplacement) {
 +          // Continue to look for tablets needing volume replacement if there 
was an error
 +          // processing tablets in the call to manageTablets() or if we are 
still performing volume
 +          // replacement. We only want to stop looking for tablets that need 
volume replacement when
 +          // we have successfully processed all tablet metadata and no more 
volume replacements are
 +          // being performed.
 +          Manager.log.debug("[{}] saw {} tablets needing volume replacement", 
store.name(),
 +              tabletMgmtStats.totalVolumeReplacements);
 +          lookForTabletsNeedingVolReplacement = 
tabletMgmtStats.totalVolumeReplacements != 0
 +              || tabletMgmtStats.tabletsWithErrors != 0;
 +          if (!lookForTabletsNeedingVolReplacement) {
 +            Manager.log.debug("[{}] no longer looking for volume 
replacements", store.name());
 +          }
 +        }
  
          // provide stats after flushing changes to avoid race conditions w/ 
delete table
          stats.end(managerState);
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index d6915b5155,0000000000..57d339485b
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1428 -1,0 +1,1426 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static java.util.stream.Collectors.toList;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentSkipListSet;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Consumer;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.client.admin.servers.ServerId;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap;
 +import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.data.ResourceGroupId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateClient;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.FateKey;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
++import 
org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction;
 +import org.apache.accumulo.core.logging.TabletLogger;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import 
org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 +import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.Manager;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
 +import 
org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob;
 +import org.apache.accumulo.manager.tableOps.FateEnv;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- import org.slf4j.event.Level;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.base.Preconditions;
 +import com.google.common.base.Suppliers;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  // Object that serves as a TopN view of the RunningCompactions, ordered by
 +  // RunningCompaction start time. The first entry in this Set should be the
 +  // oldest RunningCompaction.
 +  public static class TimeOrderedRunningCompactionSet {
 +
 +    private static final int UPPER_LIMIT = 50;
 +
 +    Comparator<RunningCompaction> oldestFirstComparator =
 +        Comparator.comparingLong(RunningCompaction::getStartTime)
 +            .thenComparing(rc -> rc.getJob().getExternalCompactionId());
 +    private final ConcurrentSkipListSet<RunningCompaction> compactions =
 +        new ConcurrentSkipListSet<>(oldestFirstComparator);
 +
 +    // Tracking size here as ConcurrentSkipListSet.size() is not constant time
 +    private final AtomicInteger size = new AtomicInteger(0);
 +
 +    public int size() {
 +      return size.get();
 +    }
 +
 +    public boolean add(RunningCompaction e) {
 +      boolean added = compactions.add(e);
 +      if (added) {
 +        if (size.incrementAndGet() > UPPER_LIMIT) {
 +          this.remove(compactions.last());
 +        }
 +      }
 +      return added;
 +    }
 +
 +    public boolean remove(Object o) {
 +      boolean removed = compactions.remove(o);
 +      if (removed) {
 +        size.decrementAndGet();
 +      }
 +      return removed;
 +    }
 +
 +    public Iterator<RunningCompaction> iterator() {
 +      return compactions.iterator();
 +    }
 +
 +    public Stream<RunningCompaction> stream() {
 +      return compactions.stream();
 +    }
 +
 +  }
 +
 +  static class FailureCounts {
 +    long failures;
 +    long successes;
 +
 +    FailureCounts(long failures, long successes) {
 +      this.failures = failures;
 +      this.successes = successes;
 +    }
 +
 +    static FailureCounts incrementFailure(Object key, FailureCounts counts) {
 +      if (counts == null) {
 +        return new FailureCounts(1, 0);
 +      }
 +      counts.failures++;
 +      return counts;
 +    }
 +
 +    static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
 +      if (counts == null) {
 +        return new FailureCounts(0, 1);
 +      }
 +      counts.successes++;
 +      return counts;
 +    }
 +  }
 +
 +  private final ConcurrentHashMap<ResourceGroupId,FailureCounts> 
failingQueues =
 +      new ConcurrentHashMap<>();
 +  private final ConcurrentHashMap<String,FailureCounts> failingCompactors =
 +      new ConcurrentHashMap<>();
 +  private final ConcurrentHashMap<TableId,FailureCounts> failingTables = new 
ConcurrentHashMap<>();
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
 +
 +  public static final String RESTART_UPDATE_MSG =
 +      "Coordinator restarted, compaction found in progress";
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational 
cache of what external
 +   * compactions may be running. Its possible it may contain external 
compactions that are not
 +   * actually running. It may not contain compactions that are actually 
running. The metadata table
 +   * is the most authoritative source of what external compactions are 
currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  protected final Map<String,TimeOrderedRunningCompactionSet> 
LONG_RUNNING_COMPACTIONS_BY_RG =
 +      new ConcurrentHashMap<>();
 +
 +  /* Map of group name to last time compactor called to get a compaction job 
*/
 +  private final Map<ResourceGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final AuditedSecurityOperation security;
 +  private final CompactionJobQueues jobQueues;
 +  private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
 +  // Exposed for tests
 +  protected final CountDownLatch shutdown = new CountDownLatch(1);
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> tabletDirCache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private final QueueMetrics queueMetrics;
 +  private final Manager manager;
 +
 +  private final LoadingCache<ResourceGroupId,Integer> compactorCounts;
 +
 +  private volatile long coordinatorStartTime;
 +
 +  private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
 +  private final Set<String> activeCompactorReservationRequest = 
ConcurrentHashMap.newKeySet();
 +
 +  public CompactionCoordinator(Manager manager,
 +      Function<FateInstanceType,FateClient<FateEnv>> fateClients) {
 +    this.ctx = manager.getContext();
 +    this.security = ctx.getSecurityOperation();
 +    this.manager = Objects.requireNonNull(manager);
 +
 +    long jobQueueMaxSize =
 +        
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
 +
 +    this.jobQueues = new CompactionJobQueues(jobQueueMaxSize);
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    this.fateClients = fateClients;
 +
 +    completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<FateId,CompactionConfig> loader =
 +        fateId -> CompactionConfigStorage.getConfig(ctx, fateId);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config 
never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a 
time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. 
Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    tabletDirCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +        .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector =
 +        new DeadCompactionDetector(this.ctx, this, 
ctx.getScheduledExecutor(), fateClients);
 +
 +    var rootReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
 +        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
 +
 +    var metaReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
 +        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true);
 +
 +    var userReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
 +        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true);
 +
 +    reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, 
Ample.DataLevel.METADATA,
 +        metaReservationPool, Ample.DataLevel.USER, userReservationPool);
 +
 +    compactorCounts = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
 +        .expireAfterWrite(2, TimeUnit.MINUTES).build(this::countCompactors);
 +    // At this point the manager does not have its lock so no actions should 
be taken yet
 +  }
 +
 +  protected int countCompactors(ResourceGroupId groupName) {
 +    return ExternalCompactionUtil.countCompactors(groupName, ctx);
 +  }
 +
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createCriticalThread("CompactionCoordinator 
Thread", this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown.countDown();
 +
 +    reservationPools.values().forEach(ExecutorService::shutdownNow);
 +
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future = schedExecutor
 +        .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startInternalStateCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 
5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startConfigMonitor(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::checkForConfigChanges, 0, 
1, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  private void checkForConfigChanges() {
 +    long jobQueueMaxSize =
 +        
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
 +    jobQueues.resetMaxSize(jobQueueMaxSize);
 +  }
 +
 +  @Override
 +  public void run() {
 +
 +    this.coordinatorStartTime = System.currentTimeMillis();
 +    startConfigMonitor(ctx.getScheduledExecutor());
 +    startCompactorZKCleaner(ctx.getScheduledExecutor());
 +
 +    // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then 
resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING 
collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of 
running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage(RESTART_UPDATE_MSG);
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        rc.setStartTime(this.coordinatorStartTime);
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
 +        LONG_RUNNING_COMPACTIONS_BY_RG
 +            .computeIfAbsent(rc.getGroup().canonical(), k -> new 
TimeOrderedRunningCompactionSet())
 +            .add(rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +    startQueueRunningSummaryLogging();
 +    startFailureSummaryLogging();
 +    startInternalStateCleaner(ctx.getScheduledExecutor());
 +
 +    try {
 +      shutdown.await();
 +    } catch (InterruptedException e) {
 +      LOG.warn("Interrupted waiting for shutdown latch.", e);
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  private Map<String,Set<HostAndPort>> getIdleCompactors(Set<ServerId> 
runningCompactors) {
 +
 +    final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
 +    runningCompactors.forEach((csi) -> allCompactors
 +        .computeIfAbsent(csi.getResourceGroup().canonical(), (k) -> new 
HashSet<>())
 +        .add(HostAndPort.fromParts(csi.getHost(), csi.getPort())));
 +
 +    final Set<String> emptyQueues = new HashSet<>();
 +
 +    // Remove all of the compactors that are running a compaction
 +    RUNNING_CACHE.values().forEach(rc -> {
 +      Set<HostAndPort> busyCompactors = 
allCompactors.get(rc.getGroup().canonical());
 +      if (busyCompactors != null
 +          && 
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
 +        if (busyCompactors.isEmpty()) {
 +          emptyQueues.add(rc.getGroup().canonical());
 +        }
 +      }
 +    });
 +    // Remove entries with empty queues
 +    emptyQueues.forEach(e -> allCompactors.remove(e));
 +    return allCompactors;
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
 +    return 
this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME)
 * 3;
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
 +  public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call 
this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    ResourceGroupId groupId = ResourceGroupId.of(groupName);
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", 
groupId, compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    ResolvedCompactionJob rcJob = (ResolvedCompactionJob) 
jobQueues.poll(groupId);
 +
 +    while (rcJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = 
getCompactionConfig(rcJob);
 +
 +      // this method may reread the metadata, do not use the metadata in 
rcJob for anything after
 +      // this method
 +      CompactionMetadata ecm = null;
 +
 +      var kind = rcJob.getKind();
 +
 +      // Only reserve user compactions when the config is present. When 
compactions are canceled the
 +      // config is deleted.
 +      var cid = ExternalCompactionId.from(externalCompactionId);
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(rcJob, compactorAddress, cid);
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, rcJob, 
compactionConfig);
 +        // It is possible that by the time this added that the the compactor 
that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, groupId));
 +        TabletLogger.compacting(rcJob.getExtent(), rcJob.getSelectedFateId(), 
cid, compactorAddress,
 +            rcJob, ecm.getCompactTmpName());
 +        break;
 +      } else {
 +        LOG.debug(
 +            "Unable to reserve compaction job for {}, pulling another off the 
queue for group {}",
 +            rcJob.getExtent(), groupName);
 +        rcJob = (ResolvedCompactionJob) 
jobQueues.poll(ResourceGroupId.of(groupName));
 +      }
 +    }
 +
 +    if (rcJob == null) {
 +      LOG.trace("No jobs found in group {} ", groupName);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", groupName,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
 +    return new TNextCompactionJob(result, compactorCounts.get(groupId));
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (tabletDirCache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        tabletDirCache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  protected CompactionMetadata 
createExternalCompactionMetadata(ResolvedCompactionJob job,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +    boolean propDels = !job.isCompactingAll();
 +    FateId fateId = job.getSelectedFateId();
 +
 +    Consumer<String> directoryCreator = dir -> 
checkTabletDir(job.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = 
TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        job.getExtent(), job.getTabletDir(), directoryCreator, 
externalCompactionId);
 +
 +    return new CompactionMetadata(job.getJobFiles(), newFile, 
compactorAddress, job.getKind(),
 +        job.getPriority(), job.getGroup(), propDels, fateId);
 +
 +  }
 +
 +  private class ReserveCompactionTask implements Supplier<CompactionMetadata> 
{
 +    private final ResolvedCompactionJob rcJob;
 +    private final String compactorAddress;
 +    private final ExternalCompactionId externalCompactionId;
 +
 +    private ReserveCompactionTask(ResolvedCompactionJob rcJob, String 
compactorAddress,
 +        ExternalCompactionId externalCompactionId) {
 +      Preconditions.checkArgument(
 +          rcJob.getKind() == CompactionKind.SYSTEM || rcJob.getKind() == 
CompactionKind.USER);
 +      this.rcJob = Objects.requireNonNull(rcJob);
 +      this.compactorAddress = Objects.requireNonNull(compactorAddress);
 +      this.externalCompactionId = 
Objects.requireNonNull(externalCompactionId);
 +      
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress),
 +          "compactor %s already on has a reservation in flight, cannot 
process %s",
 +          compactorAddress, externalCompactionId);
 +    }
 +
 +    @Override
 +    public CompactionMetadata get() {
 +      if (ctx.getTableState(rcJob.getExtent().tableId()) != 
TableState.ONLINE) {
 +        return null;
 +      }
 +
 +      try {
 +        try (var tabletsMutator = 
ctx.getAmple().conditionallyMutateTablets()) {
 +          var extent = rcJob.getExtent();
 +          var jobFiles = rcJob.getJobFiles();
 +          long selectedExpirationDuration = 
ctx.getTableConfiguration(extent.tableId())
 +              
.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION);
 +          var reservationCheck = new 
CompactionReservationCheck(rcJob.getKind(), jobFiles,
 +              rcJob.getSelectedFateId(), rcJob.isOverlapsSelectedFiles(), 
manager.getSteadyTime(),
 +              selectedExpirationDuration);
 +          var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +              .requireCheckSuccess(reservationCheck);
 +
 +          var ecm = createExternalCompactionMetadata(rcJob, compactorAddress, 
externalCompactionId);
 +
 +          if (rcJob.isOverlapsSelectedFiles()) {
 +            // There is corresponding code in CompactionReservationCheck that 
ensures this delete is
 +            // safe to do.
 +            tabletMutator.deleteSelectedFiles();
 +          }
 +          tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +
 +          tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId),
 +              () -> "compaction reservation");
 +
 +          var result = tabletsMutator.process().get(extent);
 +
 +          if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +            return ecm;
 +          } else {
 +            return null;
 +          }
 +        }
 +      } finally {
 +        
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress),
 +            "compactorAddress:%s", compactorAddress);
 +      }
 +    }
 +  }
 +
 +  protected CompactionMetadata reserveCompaction(ResolvedCompactionJob rcJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    if (activeCompactorReservationRequest.contains(compactorAddress)) {
 +      // In this case the compactor has a previously submitted reservation 
request that is still
 +      // processing. Do not want to let it queue up another reservation 
request. One possible cause
 +      // of this is that compactor timed out waiting for its last request to 
process and is now
 +      // making another request. The previously submitted request can not be 
used because the
 +      // compactor generates a new uuid for each request it makes. So the 
best thing to do is to
 +      // return null and wait for this situation to resolve. This will likely 
happen when some part
 +      // of the distributed system is not working well, so at this point want 
to avoid making
 +      // problems worse instead of trying to reserve a job.
 +      LOG.warn(
 +          "Ignoring request from {} to reserve compaction job because it has 
a reservation request in progress.",
 +          compactorAddress);
 +      return null;
 +    }
 +
 +    var dataLevel = DataLevel.of(rcJob.getExtent().tableId());
 +    var future = CompletableFuture.supplyAsync(
 +        new ReserveCompactionTask(rcJob, compactorAddress, 
externalCompactionId),
 +        reservationPools.get(dataLevel));
 +    return future.join();
 +  }
 +
 +  protected TExternalCompactionJob createThriftJob(String 
externalCompactionId,
 +      CompactionMetadata ecm, ResolvedCompactionJob rcJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
 +    // Only reach out to metadata table and get these if requested, usually 
not needed unless
 +    // plugiun requests it.
 +    Supplier<Set<CompactableFile>> selectedFiles = Suppliers.memoize(() -> {
 +      if (rcJob.getKind() == CompactionKind.SYSTEM) {
 +        return Set.of();
 +      } else {
 +        var tabletMetadata =
 +            ctx.getAmple().readTablet(rcJob.getExtent(), SELECTED, FILES, 
PREV_ROW);
 +        Preconditions.checkState(
 +            
tabletMetadata.getSelectedFiles().getFateId().equals(rcJob.getSelectedFateId()));
 +        return tabletMetadata.getSelectedFiles().getFiles().stream()
 +            .map(file -> new CompactableFileImpl(file, 
tabletMetadata.getFilesMap().get(file)))
 +            .collect(Collectors.toUnmodifiableSet());
 +      }
 +    });
 +
 +    Map<String,String> overrides = 
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
 +        rcJob.getExtent(), rcJob.getFiles(), selectedFiles, 
ecm.getCompactTmpName());
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = rcJob.getJobFilesMap().entrySet().stream().map(e -> {
 +      StoredTabletFile file = e.getKey();
 +      DataFileValue dfv = e.getValue();
 +      return new InputFile(file.getMetadata(), dfv.getSize(), 
dfv.getNumEntries(), dfv.getTime());
 +    }).collect(toList());
 +
 +    // The fateId here corresponds to the Fate transaction that is driving a 
user initiated
 +    // compaction. A system initiated compaction has no Fate transaction 
driving it so its ok to set
 +    // it to null. If anything tries to use the id for a system compaction 
and triggers a NPE it's
 +    // probably a bug that needs to be fixed.
 +    FateId fateId = null;
 +    if (rcJob.getKind() == CompactionKind.USER) {
 +      fateId = rcJob.getSelectedFateId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId, 
rcJob.getExtent().toThrift(), files,
 +        iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateId == null ? null 
: fateId.toThrift(),
 +        overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
 +    ArrayList<CompactionJob> resolvedJobs = new ArrayList<>(jobs.size());
 +    for (var job : jobs) {
 +      resolvedJobs.add(new ResolvedCompactionJob(job, tabletMetadata));
 +    }
 +
 +    jobQueues.add(tabletMetadata.getExtent(), resolvedJobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  private Optional<CompactionConfig> 
getCompactionConfig(ResolvedCompactionJob rcJob) {
 +    if (rcJob.getKind() == CompactionKind.USER) {
 +      var cconf = compactionConfigCache.get(rcJob.getSelectedFateId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This 
method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can 
commit. Its possible that
 +   * things changed while the compaction was running and it can no longer 
commit.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets 
files or location
 +   * changed since reading the tablets metadata, then conditional mutation 
will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. 
When committing a
 +   * compaction the compacted files are removed and scan entries are added to 
the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between 
updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has 
a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if 
it has a location. This
 +   * will cause the tablet to start using the newly compacted files for 
future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using 
them.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The 
user compaction fate
 +   * operation will see the compaction was committed after this code updates 
the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not 
be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as 
part of the fate
 +   * operation so that it's known to be done before the fate operation 
returns. Since the fate
 +   * operation will do it, there is no need to do it here for user 
compactions.
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    // maybe fate has not started yet
 +    var extent = KeyExtent.fromThrift(textent);
 +    var fateType = FateInstanceType.fromTableId(extent.tableId());
 +    var localFate = fateClients.apply(fateType);
 +
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    captureSuccess(ecid, extent);
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 +
 +    var tableState = manager.getContext().getTableState(extent.tableId());
 +    if (tableState != TableState.ONLINE) {
 +      // Its important this check is done after the compaction id is set in 
the metadata table to
 +      // avoid race conditions with the client code that waits for tables to 
go offline. That code
 +      // looks for compaction ids in the metadata table after setting the 
table state. When that
 +      // client code sees nothing for a tablet its important that nothing 
will changes the tablets
 +      // files after that point in time which this check ensure.
 +      LOG.debug("Not committing compaction {} for {} because of table state 
{}", ecid, extent,
 +          tableState);
 +      // cleanup metadata table and files related to the compaction
 +      compactionsFailed(Map.of(ecid, extent));
 +      return;
 +    }
 +
 +    if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    // Start a fate transaction to commit the compaction.
 +    CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
 +    var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, 
extent, ecm, stats));
 +    localFate.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION,
 +        FateKey.forCompactionCommit(ecid), renameOp, true);
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
 +      TKeyExtent extent, String exceptionMessage, TCompactionState 
failureState)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    if (failureState != TCompactionState.CANCELLED || failureState != 
TCompactionState.FAILED) {
 +      LOG.error("Unexpected failure state sent to compactionFailed: {}. This 
is likely a bug.",
 +          failureState);
 +    }
 +    KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
 +    LOG.info("Compaction {}: id: {}, extent: {}, compactor exception:{}", 
failureState,
 +        externalCompactionId, fromThriftExtent, exceptionMessage);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    if (failureState == TCompactionState.FAILED) {
 +      captureFailure(ecid, fromThriftExtent);
 +    }
 +    compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
 +    var rc = RUNNING_CACHE.get(ecid);
 +    if (rc != null) {
 +      failingQueues.compute(rc.getGroup(), FailureCounts::incrementFailure);
 +      final String compactor = rc.getCompactorAddress();
 +      failingCompactors.compute(compactor, FailureCounts::incrementFailure);
 +    }
 +    failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
 +  }
 +
 +  protected void startQueueRunningSummaryLogging() {
 +    CoordinatorSummaryLogger summaryLogger =
 +        new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE, 
compactorCounts);
 +
 +    ScheduledFuture<?> future = ctx.getScheduledExecutor()
 +        .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startFailureSummaryLogging() {
 +    ScheduledFuture<?> future =
 +        ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 
0, 5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  private <T> void printStats(String logPrefix, 
ConcurrentHashMap<T,FailureCounts> failureCounts,
 +      boolean logSuccessAtTrace) {
 +    for (var key : failureCounts.keySet()) {
 +      failureCounts.compute(key, (k, counts) -> {
 +        if (counts != null) {
-           Level level;
++          ConditionalLogAction logAction = Logger::debug;
 +          if (counts.failures > 0) {
-             level = Level.WARN;
++            logAction = Logger::warn;
 +          } else if (logSuccessAtTrace) {
-             level = Level.TRACE;
-           } else {
-             level = Level.DEBUG;
++            logAction = Logger::trace;
 +          }
 +
-           LOG.atLevel(level).log("{} {} failures:{} successes:{} since last 
time this was logged ",
++          logAction.log(LOG, "{} {} failures:{} successes:{} since last time 
this was logged ",
 +              logPrefix, k, counts.failures, counts.successes);
 +        }
 +
 +        // clear the counts so they can start building up for the next 
logging if this key is ever
 +        // used again
 +        return null;
 +      });
 +    }
 +  }
 +
 +  private void printStats() {
 +    // Remove down compactors from failing list
 +    Map<String,Set<HostAndPort>> allCompactors = 
ExternalCompactionUtil.getCompactorAddrs(ctx);
 +    Set<String> allCompactorAddrs = new HashSet<>();
 +    allCompactors.values().forEach(l -> l.forEach(c -> 
allCompactorAddrs.add(c.toString())));
 +    failingCompactors.keySet().retainAll(allCompactorAddrs);
 +    printStats("Queue", failingQueues, false);
 +    printStats("Table", failingTables, false);
 +    printStats("Compactor", failingCompactors, true);
 +  }
 +
 +  private void captureSuccess(ExternalCompactionId ecid, KeyExtent extent) {
 +    var rc = RUNNING_CACHE.get(ecid);
 +    if (rc != null) {
 +      failingQueues.compute(rc.getGroup(), FailureCounts::incrementSuccess);
 +      final String compactor = rc.getCompactorAddress();
 +      failingCompactors.compute(compactor, FailureCounts::incrementSuccess);
 +    }
 +    failingTables.compute(extent.tableId(), FailureCounts::incrementSuccess);
 +  }
 +
 +  void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +    // Need to process each level by itself because the conditional tablet 
mutator does not support
 +    // mutating multiple data levels at the same time. Also the conditional 
tablet mutator does not
 +    // support submitting multiple mutations for a single tablet, so need to 
group by extent.
 +
 +    Map<DataLevel,Map<KeyExtent,Set<ExternalCompactionId>>> 
groupedCompactions =
 +        new EnumMap<>(DataLevel.class);
 +
 +    compactions.forEach((ecid, extent) -> {
 +      groupedCompactions.computeIfAbsent(DataLevel.of(extent.tableId()), dl 
-> new HashMap<>())
 +          .computeIfAbsent(extent, e -> new HashSet<>()).add(ecid);
 +    });
 +
 +    groupedCompactions
 +        .forEach((dataLevel, levelCompactions) -> 
compactionFailedForLevel(levelCompactions));
 +  }
 +
 +  void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>> 
compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((extent, ecids) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          var mutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation();
 +          ecids.forEach(mutator::requireCompaction);
 +          ecids.forEach(mutator::deleteExternalCompaction);
 +          mutator.submit(new RejectionHandler() {
 +            @Override
 +            public boolean callWhenTabletDoesNotExists() {
 +              return true;
 +            }
 +
 +            @Override
 +            public boolean test(TabletMetadata tabletMetadata) {
 +              return tabletMetadata == null
 +                  || 
Collections.disjoint(tabletMetadata.getExternalCompactions().keySet(), ecids);
 +            }
 +
 +          });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector 
runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, 
compactions.get(extent));
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a 
compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. 
This block is
 +          // entered when the conditional mutator above successfully deletes 
an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet 
directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          ecidsForTablet.addAll(compactions.get(extent));
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = 
ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + 
tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = null;
 +                    try {
 +                      files = fs.listStatus(new Path(volPath),
 +                          (path) -> path.getName().endsWith(fileSuffix));
 +                    } catch (FileNotFoundException e) {
 +                      LOG.trace("Failed to list tablet dir {}", volPath, e);
 +                    }
 +                    if (files != null) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", 
file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", 
file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for 
tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be 
due to a merge or
 +              // split operation. Use the utility to find tmp files at the 
table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.values().forEach(ecids -> 
ecids.forEach(this::recordCompletion));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long 
timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", 
externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +      switch (update.state) {
 +        case STARTED:
 +          
LONG_RUNNING_COMPACTIONS_BY_RG.computeIfAbsent(rc.getGroup().canonical(),
 +              k -> new TimeOrderedRunningCompactionSet()).add(rc);
 +          break;
 +        case CANCELLED:
 +        case FAILED:
 +        case SUCCEEDED:
 +          var compactionSet = 
LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical());
 +          if (compactionSet != null) {
 +            compactionSet.remove(rc);
 +          }
 +          break;
 +        case ASSIGNED:
 +        case IN_PROGRESS:
 +        default:
 +          // do nothing
 +          break;
 +
 +      }
 +    }
 +  }
 +
 +  public void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +      var compactionSet = 
LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical());
 +      if (compactionSet != null) {
 +        compactionSet.remove(rc);
 +      }
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    try (TabletsMetadata tabletsMetadata =
 +        this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
 +            .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) 
{
 +      return tabletsMetadata.stream().flatMap(tm -> 
tm.getExternalCompactions().keySet().stream())
 +          .collect(Collectors.toSet());
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionMap getRunningCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionMap result = new TExternalCompactionMap();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroup().canonical());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return top 50 longest running compactions for each resource group
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of group name to list of up to 50 compactions in sorted 
order, oldest compaction
 +   *         first.
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public Map<String,TExternalCompactionList> getLongRunningCompactions(TInfo 
tinfo,
 +      TCredentials credentials) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final Map<String,TExternalCompactionList> result = new HashMap<>();
 +
 +    for (Entry<String,TimeOrderedRunningCompactionSet> e : 
LONG_RUNNING_COMPACTIONS_BY_RG
 +        .entrySet()) {
 +      final TExternalCompactionList compactions = new 
TExternalCompactionList();
 +      Iterator<RunningCompaction> iter = e.getValue().iterator();
 +      while (iter.hasNext()) {
 +        RunningCompaction rc = iter.next();
 +        TExternalCompaction trc = new TExternalCompaction();
 +        trc.setGroupName(rc.getGroup().canonical());
 +        trc.setCompactor(rc.getCompactorAddress());
 +        trc.setUpdates(rc.getUpdates());
 +        trc.setJob(rc.getJob());
 +        compactions.addToCompactions(trc);
 +      }
 +      result.put(e.getKey(), compactions);
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionMap getCompletedCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionMap result = new TExternalCompactionMap();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroup().canonical());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
 +      throws TException {
 +    var runningCompaction = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), 
null,
 +          TableOperation.COMPACT_CANCEL, 
TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
 +  }
 +
 +  /* Method exists to be called from test */
 +  public CompactionJobQueues getJobQueues() {
 +    return jobQueues;
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected Set<ServerId> getRunningCompactors() {
 +    return ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, 
externalCompactionId);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race 
condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpEmptyCompactorPathInZK() {
 +
 +    final var zoorw = this.ctx.getZooSession().asReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(Constants.ZCOMPACTORS);
 +
 +      for (String group : groups) {
 +        final String qpath = Constants.ZCOMPACTORS + "/" + group;
 +        final ResourceGroupId cgid = ResourceGroupId.of(group);
 +        final var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +          // Group has no compactors, we can clear its
 +          // associated priority queue of jobs
 +          CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
 +          if (queue != null) {
 +            queue.clearIfInactive(Duration.ofMinutes(10));
 +          }
 +        } else {
 +          for (String compactor : compactors) {
 +            String cpath = Constants.ZCOMPACTORS + "/" + group + "/" + 
compactor;
 +            var lockNodes =
 +                zoorw.getChildren(Constants.ZCOMPACTORS + "/" + group + "/" + 
compactor);
 +            if (lockNodes.isEmpty()) {
 +              deleteEmpty(zoorw, cpath);
 +            }
 +          }
 +        }
 +      }
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  private Set<ResourceGroupId> getCompactionServicesConfigurationGroups()
 +      throws ReflectiveOperationException, IllegalArgumentException, 
SecurityException {
 +
 +    Set<ResourceGroupId> groups = new HashSet<>();
 +    AccumuloConfiguration config = ctx.getConfiguration();
 +    CompactionServicesConfig servicesConfig = new 
CompactionServicesConfig(config);
 +
 +    for (var entry : servicesConfig.getPlanners().entrySet()) {
 +      String serviceId = entry.getKey();
 +      String plannerClassName = entry.getValue();
 +
 +      Class<? extends CompactionPlanner> plannerClass =
 +          Class.forName(plannerClassName).asSubclass(CompactionPlanner.class);
 +      CompactionPlanner planner = 
plannerClass.getDeclaredConstructor().newInstance();
 +
 +      var initParams = new 
CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
 +          servicesConfig.getPlannerPrefix(serviceId), 
servicesConfig.getOptions().get(serviceId),
 +          new ServiceEnvironmentImpl(ctx));
 +
 +      planner.init(initParams);
 +
 +      groups.addAll(initParams.getRequestedGroups());
 +    }
 +    return groups;
 +  }
 +
 +  public void cleanUpInternalState() {
 +
 +    // This method does the following:
 +    //
 +    // 1. Removes entries from RUNNING_CACHE and 
LONG_RUNNING_COMPACTIONS_BY_RG that are not really
 +    // running
 +    // 2. Cancels running compactions for groups that are not in the current 
configuration
 +    // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
 +    // 4. Log groups with no compactors
 +    // 5. Log compactors with no groups
 +    // 6. Log groups with compactors and queued jos that have not checked in
 +
 +    var config = ctx.getConfiguration();
 +    ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
 +        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT);
 +    ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config,
 +        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META);
 +    ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config,
 +        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER);
 +
 +    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    final Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(Sets.union(RUNNING_CACHE.keySet(),
 +        LONG_RUNNING_COMPACTIONS_BY_RG.values().stream()
 +            .flatMap(TimeOrderedRunningCompactionSet::stream)
 +            .map(rc -> 
rc.getJob().getExternalCompactionId()).map(ExternalCompactionId::of)
 +            .collect(Collectors.toSet())));
 +
 +    // grab the ids that are listed as running in the metadata table. It 
important that this is done
 +    // after getting the snapshot.
 +    final Set<ExternalCompactionId> idsInMetadata = 
readExternalCompactionIds();
 +    LOG.trace("Current ECIDs in metadata: {}", idsInMetadata.size());
 +    LOG.trace("Current ECIDs in running cache: {}", idsSnapshot.size());
 +
 +    final Set<ExternalCompactionId> idsToRemove = 
Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +
 +    // Get the set of groups being referenced in the current configuration
 +    Set<ResourceGroupId> groupsInConfiguration = null;
 +    try {
 +      groupsInConfiguration = getCompactionServicesConfigurationGroups();
 +    } catch (RuntimeException | ReflectiveOperationException e) {
 +      LOG.error(
 +          "Error getting groups from the compaction services configuration. 
Unable to clean up internal state.",
 +          e);
 +      return;
 +    }
 +
 +    // Compaction jobs are created in the TabletGroupWatcher and added to the 
Coordinator
 +    // via the addJobs method which adds the job to the CompactionJobQueues 
object.
 +    final Set<ResourceGroupId> groupsWithJobs = jobQueues.getQueueIds();
 +
 +    final Set<ResourceGroupId> jobGroupsNotInConfiguration =
 +        Sets.difference(groupsWithJobs, groupsInConfiguration);
 +
 +    if (jobGroupsNotInConfiguration != null && 
!jobGroupsNotInConfiguration.isEmpty()) {
 +      RUNNING_CACHE.values().forEach(rc -> {
 +        if 
(jobGroupsNotInConfiguration.contains(ResourceGroupId.of(rc.getGroup().canonical())))
 {
 +          LOG.warn(
 +              "External compaction {} running in group {} on compactor {},"
 +                  + " but group not found in current configuration. Failing 
compaction...",
 +              rc.getJob().getExternalCompactionId(), rc.getGroup(), 
rc.getCompactorAddress());
 +          cancelCompactionOnCompactor(rc.getCompactorAddress(),
 +              rc.getJob().getExternalCompactionId());
 +        }
 +      });
 +
 +      final Set<ResourceGroupId> trackedGroups = 
Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet());
 +      TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration);
 +      LOG.debug("No longer tracking compactor check-in times for groups: {}",
 +          Sets.difference(trackedGroups, 
TIME_COMPACTOR_LAST_CHECKED.keySet()));
 +    }
 +
 +    final Set<ServerId> runningCompactors = getRunningCompactors();
 +
 +    final Set<ResourceGroupId> runningCompactorGroups = new HashSet<>();
 +    runningCompactors.forEach(
 +        c -> 
runningCompactorGroups.add(ResourceGroupId.of(c.getResourceGroup().canonical())));
 +
 +    final Set<ResourceGroupId> groupsWithNoCompactors =
 +        Sets.difference(groupsInConfiguration, runningCompactorGroups);
 +    if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
 +      for (ResourceGroupId group : groupsWithNoCompactors) {
 +        long queuedJobCount = jobQueues.getQueuedJobs(group);
 +        if (queuedJobCount > 0) {
 +          LOG.warn("Compactor group {} has {} queued compactions but no 
running compactors", group,
 +              queuedJobCount);
 +        }
 +      }
 +    }
 +
 +    final Set<ResourceGroupId> compactorsWithNoGroups =
 +        Sets.difference(runningCompactorGroups, groupsInConfiguration);
 +    if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
 +      LOG.warn(
 +          "The following groups have running compactors, but are not in the 
current configuration: {}",
 +          compactorsWithNoGroups);
 +    }
 +
 +    final long now = System.currentTimeMillis();
 +    final long warningTime = getMissingCompactorWarningTime();
 +    Map<String,Set<HostAndPort>> idleCompactors = 
getIdleCompactors(runningCompactors);
 +    for (ResourceGroupId groupName : groupsInConfiguration) {
 +      long lastCheckTime =
 +          TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, 
coordinatorStartTime);
 +      if ((now - lastCheckTime) > warningTime && 
jobQueues.getQueuedJobs(groupName) > 0
 +          && idleCompactors.containsKey(groupName.canonical())) {
 +        LOG.warn(
 +            "The group {} has queued jobs and {} idle compactors, however 
none have checked in "
 +                + "with coordinator for {}ms",
 +            groupName, idleCompactors.get(groupName.canonical()).size(), 
warningTime);
 +      }
 +    }
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 1bc3f51303,ca9901a7ae..186a53e299
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -328,8 -247,8 +327,8 @@@ public class UpgradeCoordinator 
        throw new IllegalStateException("Error checking properties", e);
      }
      try {
-       CheckCompactionConfig.validate(context.getConfiguration(), Level.INFO);
 -      CheckCompactionConfig.validate(context.getConfiguration());
 -    } catch (SecurityException | IllegalArgumentException | 
ReflectiveOperationException e) {
++      CheckCompactionConfig.validate(context.getConfiguration(), 
Logger::info);
 +    } catch (RuntimeException | ReflectiveOperationException e) {
        throw new IllegalStateException("Error validating compaction 
configuration", e);
      }
    }

Reply via email to