This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit c4385208de898278460b8f9063d0245b89d38dcf
Merge: 4777675d7e 9fee5991fb
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Jan 31 19:53:34 2025 +0000

    Merge branch '3.1'

 .../accumulo/core/lock/ServiceLockSupport.java     |   30 +-
 .../accumulo/core/rpc/clients/ManagerClient.java   |   10 +-
 ....java => ServerProcessServiceThriftClient.java} |   35 +-
 .../core/rpc/clients/ThriftClientTypes.java        |    3 +
 .../core/util/threads/ThreadPoolNames.java         |    1 +
 core/src/main/scripts/generate-thrift.sh           |    2 +-
 core/src/main/spotbugs/exclude-filter.xml          |    1 +
 .../core/manager/thrift/ManagerClientService.java  | 1271 ++++++++++++++++++++
 .../core/process/thrift/ServerProcessService.java  |  619 ++++++++++
 core/src/main/thrift/manager.thrift                |    9 +
 core/src/main/thrift/process.thrift                |   31 +
 .../MiniAccumuloClusterControl.java                |   64 +
 .../org/apache/accumulo/server/AbstractServer.java |   66 +-
 .../accumulo/server/manager/LiveTServerSet.java    |   22 +-
 .../accumulo/server/rpc/ThriftProcessorTypes.java  |   35 +-
 .../org/apache/accumulo/server/util/Admin.java     |   33 +
 .../server/zookeeper/DistributedWorkQueue.java     |   26 +-
 .../org/apache/accumulo/compactor/Compactor.java   |  337 +++---
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  255 ++--
 .../java/org/apache/accumulo/manager/Manager.java  |   32 +-
 .../manager/ManagerClientServiceHandler.java       |   12 +
 .../accumulo/manager/recovery/RecoveryManager.java |    4 +-
 .../apache/accumulo/monitor/EmbeddedWebServer.java |    2 +-
 .../java/org/apache/accumulo/monitor/Monitor.java  |   19 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |   56 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  116 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |   17 +-
 .../apache/accumulo/tserver/ScanServerTest.java    |    5 +
 .../tserver/log/RecoveryLogsIteratorTest.java      |   11 +-
 .../tserver/log/SortedLogRecoveryTest.java         |   48 +-
 .../tserver/log/TestUpgradePathForWALogs.java      |   17 +-
 .../accumulo/test/SelfStoppingScanServer.java      |    2 +-
 .../compaction/ExternalDoNothingCompactor.java     |    2 +-
 .../accumulo/test/fate/FateExecutionOrderIT.java   |    1 +
 .../test/functional/GracefulShutdownIT.java        |  295 +++++
 35 files changed, 3066 insertions(+), 423 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
index dda14f50c0,fd7a5bcd85..d4e0cd0b58
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
@@@ -66,17 -40,24 +66,24 @@@ public class ServiceLockSupport 
  
      private static final Logger LOG = 
LoggerFactory.getLogger(HAServiceLockWatcher.class);
  
 -    private final String serviceName;
 +    private final Type server;
+     private final Supplier<Boolean> shutdownComplete;
      private volatile boolean acquiredLock = false;
      private volatile boolean failedToAcquireLock = false;
  
-     public HAServiceLockWatcher(Type server) {
 -    public HAServiceLockWatcher(String serviceName, Supplier<Boolean> 
shutdownComplete) {
 -      this.serviceName = serviceName;
++    public HAServiceLockWatcher(Type server, Supplier<Boolean> 
shutdownComplete) {
 +      this.server = server;
+       this.shutdownComplete = shutdownComplete;
      }
  
      @Override
      public void lostLock(LockLossReason reason) {
-       Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
+       if (shutdownComplete.get()) {
 -        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.",
 -            serviceName, reason);
++        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.", server,
++            reason);
+       } else {
 -        Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason 
+ "), exiting!", -1);
++        Halt.halt(server + " lock in zookeeper lost (reason = " + reason + 
"), exiting!", -1);
+       }
      }
  
      @Override
@@@ -146,25 -128,28 +153,28 @@@
  
      private static final Logger LOG = 
LoggerFactory.getLogger(ServiceLockWatcher.class);
  
 -    private final String serviceName;
 +    private final Type server;
-     private final Supplier<Boolean> shuttingDown;
+     private final Supplier<Boolean> shutdownComplete;
 -    private final Consumer<String> lostLockAction;
 +    private final Consumer<Type> lostLockAction;
  
-     public ServiceLockWatcher(Type server, Supplier<Boolean> shuttingDown,
 -    public ServiceLockWatcher(String serviceName, Supplier<Boolean> 
shutdownComplete,
 -        Consumer<String> lostLockAction) {
 -      this.serviceName = serviceName;
++    public ServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete,
 +        Consumer<Type> lostLockAction) {
 +      this.server = server;
-       this.shuttingDown = shuttingDown;
+       this.shutdownComplete = shutdownComplete;
        this.lostLockAction = lostLockAction;
      }
  
      @Override
      public void lostLock(final LockLossReason reason) {
-       Halt.halt(1, () -> {
-         if (!shuttingDown.get()) {
+       if (shutdownComplete.get()) {
 -        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.",
 -            serviceName, reason);
++        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.", server,
++            reason);
+       } else {
+         Halt.halt(1, () -> {
 -          LOG.error("{} lost lock (reason = {}), exiting.", serviceName, 
reason);
 -          lostLockAction.accept(serviceName);
 +          LOG.error("{} lost lock (reason = {}), exiting.", server, reason);
-         }
-         lostLockAction.accept(server);
-       });
++          lostLockAction.accept(server);
+         });
+       }
      }
  
      @Override
diff --cc 
core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
index 29057cf3cb,d0076e69f1..5127807e0b
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
@@@ -44,7 -43,11 +44,15 @@@ public interface ManagerClient<C extend
        return null;
      }
  
-     HostAndPort manager = 
HostAndPort.fromString(managers.iterator().next().toHostPortString());
 -    HostAndPort manager = HostAndPort.fromString(locations.get(0));
 -    if (manager.getPort() == 0) {
++    final String managerLocation = 
managers.iterator().next().toHostPortString();
++    if (managerLocation.equals("0.0.0.0:0")) {
++      // The Manager creates the lock with an initial address of 0.0.0.0:0, 
then
++      // later updates the lock contents with the actual address after 
everything
++      // is started.
++      log.debug("Manager is up and lock acquired, waiting for address...");
+       return null;
+     }
 -
++    HostAndPort manager = HostAndPort.fromString(managerLocation);
      try {
        // Manager requests can take a long time: don't ever time out
        return ThriftUtil.getClientNoTimeout(type, manager, context);
diff --cc 
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 53ffdf1443,b8368c9e61..57164165cf
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@@ -24,10 -24,11 +24,12 @@@ import java.io.IOException
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.HashMap;
++import java.util.HashSet;
 +import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
 -import java.util.Optional;
+ import java.util.Set;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.TimeoutException;
@@@ -514,12 -529,67 +516,74 @@@ public class MiniAccumuloClusterContro
      stop(server, hostname);
    }
  
 +  public List<Process> getCompactors(String resourceGroup) {
 +    return compactorProcesses.get(resourceGroup);
 +  }
 +
 +  public List<Process> getTabletServers(String resourceGroup) {
 +    return tabletServerProcesses.get(resourceGroup);
 +  }
 +
+   public void refreshProcesses(ServerType type) {
+     switch (type) {
 -      case COMPACTION_COORDINATOR:
 -        if (!coordinatorProcess.isAlive()) {
 -          coordinatorProcess = null;
 -        }
 -        break;
+       case COMPACTOR:
 -        compactorProcesses.removeIf(process -> !process.isAlive());
++        compactorProcesses.forEach((k, v) -> v.removeIf(process -> 
!process.isAlive()));
+         break;
+       case GARBAGE_COLLECTOR:
+         if (!gcProcess.isAlive()) {
+           gcProcess = null;
+         }
+         break;
+       case MANAGER:
+         if (!managerProcess.isAlive()) {
+           managerProcess = null;
+         }
+         break;
+       case MONITOR:
+         if (!monitor.isAlive()) {
+           monitor = null;
+         }
+         break;
+       case SCAN_SERVER:
 -        scanServerProcesses.removeIf(process -> !process.isAlive());
++        scanServerProcesses.forEach((k, v) -> v.removeIf(process -> 
!process.isAlive()));
+         break;
+       case TABLET_SERVER:
 -        tabletServerProcesses.removeIf(process -> !process.isAlive());
++        tabletServerProcesses.forEach((k, v) -> v.removeIf(process -> 
!process.isAlive()));
+         break;
+       case ZOOKEEPER:
+         if (!zooKeeperProcess.isAlive()) {
+           zooKeeperProcess = null;
+         }
+         break;
+       default:
+         throw new IllegalArgumentException("Unhandled type: " + type);
+     }
+   }
+ 
+   public Set<Process> getProcesses(ServerType type) {
+     switch (type) {
 -      case COMPACTION_COORDINATOR:
 -        return coordinatorProcess == null ? Set.of() : 
Set.of(coordinatorProcess);
+       case COMPACTOR:
 -        return Set.copyOf(compactorProcesses);
++        Set<Process> cprocesses = new HashSet<>();
++        compactorProcesses.values().forEach(list -> 
list.forEach(cprocesses::add));
++        return cprocesses;
+       case GARBAGE_COLLECTOR:
+         return gcProcess == null ? Set.of() : Set.of(gcProcess);
+       case MANAGER:
+         return managerProcess == null ? Set.of() : Set.of(managerProcess);
+       case MONITOR:
+         return monitor == null ? Set.of() : Set.of(monitor);
+       case SCAN_SERVER:
 -        return Set.copyOf(scanServerProcesses);
++        Set<Process> sprocesses = new HashSet<>();
++        scanServerProcesses.values().forEach(list -> 
list.forEach(sprocesses::add));
++        return sprocesses;
+       case TABLET_SERVER:
 -        return Set.copyOf(tabletServerProcesses);
++        Set<Process> tprocesses = new HashSet<>();
++        tabletServerProcesses.values().forEach(list -> 
list.forEach(tprocesses::add));
++        return tprocesses;
+       case ZOOKEEPER:
+         return zooKeeperProcess == null ? Set.of() : Set.of(zooKeeperProcess);
+       default:
+         throw new IllegalArgumentException("Unhandled type: " + type);
+     }
+   }
  }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 2eb814ac23,115cbb424f..268d8f5dbb
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@@ -20,21 -20,21 +20,25 @@@ package org.apache.accumulo.server
  
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  
 +import java.util.List;
  import java.util.OptionalInt;
  import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Function;
  
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.classloader.ClassLoaderUtil;
  import org.apache.accumulo.core.cli.ConfigOpts;
+ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.conf.cluster.ClusterConfigParser;
  import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.metrics.MetricsProducer;
+ import org.apache.accumulo.core.process.thrift.ServerProcessService;
+ import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.Halt;
  import org.apache.accumulo.core.util.Timer;
@@@ -62,9 -62,10 +67,11 @@@ public abstract class AbstractServe
    private volatile Timer idlePeriodTimer = null;
    private volatile Thread serverThread;
    private volatile Thread verificationThread;
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+   private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);
  
 -  protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
 +  protected AbstractServer(String appName, ConfigOpts opts,
 +      Function<SiteConfiguration,ServerContext> serverContextFactory, 
String[] args) {
      this.applicationName = appName;
      opts.parseArgs(appName, args);
      var siteConfig = opts.getSiteConfiguration();
@@@ -120,16 -119,62 +127,70 @@@
      }
    }
  
 +  protected String getResourceGroupPropertyValue(SiteConfiguration conf) {
 +    return Constants.DEFAULT_RESOURCE_GROUP_NAME;
 +  }
 +
 +  public String getResourceGroup() {
 +    return resourceGroup;
 +  }
 +
+   @Override
+   public void gracefulShutdown(TCredentials credentials) {
+ 
+     try {
+       if 
(!context.getSecurityOperation().canPerformSystemActions(credentials)) {
+         log.warn("Ignoring shutdown request, user " + 
credentials.getPrincipal()
+             + " does not have the appropriate permissions.");
+       }
+     } catch (ThriftSecurityException e) {
+       log.error(
+           "Error trying to determine if user has permissions to shutdown 
server, ignoring request",
+           e);
+       return;
+     }
+ 
+     if (shutdownRequested.compareAndSet(false, true)) {
+       // Don't interrupt the server thread, that will cause
+       // IO operations to fail as the servers are finishing
+       // their work.
+       log.info("Graceful shutdown initiated.");
+     } else {
+       log.warn("Graceful shutdown previously requested.");
+     }
+   }
+ 
+   public boolean isShutdownRequested() {
+     return shutdownRequested.get();
+   }
+ 
+   public AtomicBoolean getShutdownComplete() {
+     return shutdownComplete;
+   }
+ 
    /**
-    * Run this server in a main thread
+    * Run this server in a main thread. The server's run method should set up 
the server, then wait
+    * on isShutdownRequested() to return false, like so:
+    *
+    * <pre>
+    * public void run() {
+    *   // setup server and start threads
+    *   while (!isShutdownRequested()) {
+    *     if (Thread.currentThread().isInterrupted()) {
+    *       LOG.info("Server process thread has been interrupted, shutting 
down");
+    *       break;
+    *     }
+    *     try {
+    *       // sleep or other things
+    *     } catch (InterruptedException e) {
+    *       gracefulShutdown();
+    *     }
+    *   }
+    *   // shut down server
+    *   getShutdownComplete().set(true);
+    *   ServiceLock.unlock(serverLock);
+    * }
+    * </pre>
     */
    public void runServer() throws Exception {
      final AtomicReference<Throwable> err = new AtomicReference<>();
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index f20aa6dfb5,9b7057b689..ab593585b0
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@@ -27,8 -26,8 +27,9 @@@ import java.util.HashMap
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Map.Entry;
 +import java.util.Optional;
  import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.TimeUnit;
  import java.util.function.Supplier;
  
@@@ -200,14 -230,16 +201,17 @@@ public class LiveTServerSet implements 
      }
    }
  
 -  // The set of active tservers with locks, indexed by their name in zookeeper
 +  // The set of active tservers with locks, indexed by their name in 
zookeeper. When the contents of
 +  // this map are modified, tServersSnapshot should be set to null.
    private final Map<String,TServerInfo> current = new HashMap<>();
 -  // as above, indexed by TServerInstance
 -  private final Map<TServerInstance,TServerInfo> currentInstances = new 
HashMap<>();
 +
 +  private LiveTServersSnapshot tServersSnapshot = null;
  
+   private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown =
+       new ConcurrentHashMap<>();
+ 
    // The set of entries in zookeeper without locks, and the first time each 
was noticed
 -  private final Map<String,Long> locklessServers = new HashMap<>();
 +  private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>();
  
    private final Supplier<ZooCache> zcSupplier;
  
@@@ -259,34 -307,33 +276,35 @@@
    }
  
    private synchronized void checkServer(final Set<TServerInstance> updates,
 -      final Set<TServerInstance> doomed, final String path, final String 
zPath)
 +      final Set<TServerInstance> doomed, final ServiceLockPath tserverPath)
        throws InterruptedException, KeeperException {
  
 -    TServerInfo info = current.get(zPath);
 +    // invalidate the snapshot forcing it to be recomputed the next time its 
requested
 +    tServersSnapshot = null;
 +
 +    final TServerInfo info = current.get(tserverPath.getServer());
  
 -    final var zLockPath = ServiceLock.path(path + "/" + zPath);
      ZcStat stat = new ZcStat();
 -    HostAndPort address = ServiceLock.getLockData(getZooCache(), zLockPath, 
stat)
 -        .map(sld -> 
sld.getAddress(ServiceLockData.ThriftService.TSERV)).orElse(null);
 +    Optional<ServiceLockData> sld = ServiceLock.getLockData(getZooCache(), 
tserverPath, stat);
  
 -    if (address == null) {
 +    if (sld.isEmpty()) {
        if (info != null) {
          doomed.add(info.instance);
 -        current.remove(zPath);
 -        currentInstances.remove(info.instance);
 -        serversShuttingDown.remove(zPath);
 +        current.remove(tserverPath.getServer());
++        serversShuttingDown.remove(tserverPath.toString());
        }
  
 -      Long firstSeen = locklessServers.get(zPath);
 +      Long firstSeen = locklessServers.get(tserverPath);
        if (firstSeen == null) {
 -        locklessServers.put(zPath, System.currentTimeMillis());
 +        locklessServers.put(tserverPath, System.currentTimeMillis());
        } else if (System.currentTimeMillis() - firstSeen > 
MINUTES.toMillis(10)) {
 -        deleteServerNode(path + "/" + zPath);
 -        locklessServers.remove(zPath);
 +        deleteServerNode(tserverPath.toString());
 +        locklessServers.remove(tserverPath);
        }
      } else {
 -      locklessServers.remove(zPath);
 +      locklessServers.remove(tserverPath);
 +      HostAndPort address = 
sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV);
 +      String resourceGroup = 
sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV);
        TServerInstance instance = new TServerInstance(address, 
stat.getEphemeralOwner());
  
        if (info == null) {
@@@ -346,68 -397,10 +364,70 @@@
      return tServerInfo.connection;
    }
  
 +  public synchronized String getResourceGroup(TServerInstance server) {
 +    if (server == null) {
 +      return null;
 +    }
 +    TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server);
 +    if (tServerInfo == null) {
 +      return null;
 +    }
 +    return tServerInfo.resourceGroup;
 +  }
 +
 +  public static class LiveTServersSnapshot {
 +    private final Set<TServerInstance> tservers;
 +    private final Map<String,Set<TServerInstance>> tserverGroups;
 +
 +    // TServerInfo is only for internal use, so this field is private w/o a 
getter.
 +    private final Map<TServerInstance,TServerInfo> tserversInfo;
 +
 +    @VisibleForTesting
 +    public LiveTServersSnapshot(Set<TServerInstance> currentServers,
 +        Map<String,Set<TServerInstance>> serverGroups) {
 +      this.tserversInfo = null;
 +      this.tservers = Set.copyOf(currentServers);
 +      Map<String,Set<TServerInstance>> copy = new HashMap<>();
 +      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
 +      this.tserverGroups = Collections.unmodifiableMap(copy);
 +    }
 +
 +    public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> 
currentServers,
 +        Map<String,Set<TServerInstance>> serverGroups) {
 +      this.tserversInfo = Map.copyOf(currentServers);
 +      this.tservers = this.tserversInfo.keySet();
 +      Map<String,Set<TServerInstance>> copy = new HashMap<>();
 +      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
 +      this.tserverGroups = Collections.unmodifiableMap(copy);
 +    }
 +
 +    public Set<TServerInstance> getTservers() {
 +      return tservers;
 +    }
 +
 +    public Map<String,Set<TServerInstance>> getTserverGroups() {
 +      return tserverGroups;
 +    }
 +  }
 +
 +  public synchronized LiveTServersSnapshot getSnapshot() {
 +    if (tServersSnapshot == null) {
 +      HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>();
 +      Map<String,Set<TServerInstance>> tserversGroups = new HashMap<>();
 +      current.values().forEach(tServerInfo -> {
 +        tServerInstances.put(tServerInfo.instance, tServerInfo);
 +        tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new 
HashSet<>())
 +            .add(tServerInfo.instance);
 +      });
 +      tServersSnapshot = new LiveTServersSnapshot(tServerInstances, 
tserversGroups);
 +    }
 +    return tServersSnapshot;
 +  }
 +
    public synchronized Set<TServerInstance> getCurrentServers() {
-     return getSnapshot().getTservers();
 -    Set<TServerInstance> current = currentInstances.keySet();
++    Set<TServerInstance> current = new HashSet<>(getSnapshot().getTservers());
+     serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance));
 -    return new HashSet<>(current);
++    return current;
    }
  
    public synchronized int size() {
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
index 6f0a6086eb,59f4e154f5..0a56b4e32f
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
@@@ -105,23 -113,39 +113,30 @@@ public class ThriftProcessorTypes<C ext
      return muxProcessor;
    }
  
-   public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface 
serviceHandler,
 -  public static TMultiplexedProcessor getCoordinatorTProcessor(
 -      ServerProcessService.Iface processHandler, 
CompactionCoordinatorService.Iface serviceHandler,
--      ServerContext context) {
 -    TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
 -    muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(),
 -        SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class,
 -            ServerProcessService.Iface.class, processHandler, context));
 -    muxProcessor.registerProcessor(COORDINATOR.getServiceName(),
 -        
COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class,
 -            CompactionCoordinatorService.Iface.class, serviceHandler, 
context));
 -    return muxProcessor;
 -  }
 -
+   public static TMultiplexedProcessor 
getGcTProcessor(ServerProcessService.Iface processHandler,
+       GCMonitorService.Iface serviceHandler, ServerContext context) {
      TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+     muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(),
+         SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class,
+             ServerProcessService.Iface.class, processHandler, context));
      muxProcessor.registerProcessor(GC.getServiceName(), GC.getTProcessor(
          GCMonitorService.Processor.class, GCMonitorService.Iface.class, 
serviceHandler, context));
      return muxProcessor;
    }
  
-   public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface 
fateServiceHandler,
+   public static TMultiplexedProcessor getManagerTProcessor(
+       ServerProcessService.Iface processHandler, FateService.Iface 
fateServiceHandler,
 +      CompactionCoordinatorService.Iface coordinatorServiceHandler,
        ManagerClientService.Iface managerServiceHandler, ServerContext 
context) {
      TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+     muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(),
+         SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class,
+             ServerProcessService.Iface.class, processHandler, context));
      muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor(
          FateService.Processor.class, FateService.Iface.class, 
fateServiceHandler, context));
 +    muxProcessor.registerProcessor(COORDINATOR.getServiceName(),
 +        
COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class,
 +            CompactionCoordinatorService.Iface.class, 
coordinatorServiceHandler, context));
      muxProcessor.registerProcessor(MANAGER.getServiceName(),
          MANAGER.getTProcessor(ManagerClientService.Processor.class,
              ManagerClientService.Iface.class, managerServiceHandler, 
context));
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 7983b4f92a,047c2c47d9..f8e8df3c52
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@@ -65,24 -57,15 +65,25 @@@ import org.apache.accumulo.core.conf.Ac
  import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.fate.AdminUtil;
 -import org.apache.accumulo.core.fate.FateTxId;
 -import org.apache.accumulo.core.fate.ReadOnlyTStore;
 -import org.apache.accumulo.core.fate.ZooStore;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 +import org.apache.accumulo.core.fate.user.UserFateStore;
 +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
  import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
  import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
  import org.apache.accumulo.core.manager.thrift.FateService;
 +import org.apache.accumulo.core.manager.thrift.TFateId;
  import org.apache.accumulo.core.metadata.AccumuloTable;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+ import org.apache.accumulo.core.process.thrift.ServerProcessService;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
  import org.apache.accumulo.core.security.Authorizations;
@@@ -661,9 -618,28 +675,28 @@@ public class Admin implements KeywordEx
          client -> client.shutdown(TraceUtil.traceInfo(), context.rpcCreds(), 
tabletServersToo));
    }
  
+   // Visible for tests
+   public static void signalGracefulShutdown(final ClientContext context, 
String address) {
+ 
+     Objects.requireNonNull(address, "address not set");
+     final HostAndPort hp = HostAndPort.fromString(address);
+     ServerProcessService.Client client = null;
+     try {
+       client = 
ThriftClientTypes.SERVER_PROCESS.getServerProcessConnection(context, log,
+           hp.getHost(), hp.getPort());
+       client.gracefulShutdown(context.rpcCreds());
+     } catch (TException e) {
+       throw new RuntimeException("Error invoking graceful shutdown for 
server: " + hp, e);
+     } finally {
+       if (client != null) {
+         ThriftUtil.returnClient(client, context);
+       }
+     }
+   }
+ 
    private static void stopTabletServer(final ClientContext context, 
List<String> servers,
        final boolean force) throws AccumuloException, 
AccumuloSecurityException {
 -    if (context.getManagerLocations().isEmpty()) {
 +    if 
(context.instanceOperations().getServers(ServerId.Type.MANAGER).isEmpty()) {
        log.info("No managers running. Not attempting safe unload of tserver.");
        return;
      }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index fb374ce229,5617213b13..a52b63945c
--- 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@@ -58,9 -58,10 +59,9 @@@ public class DistributedWorkQueue 
  
    private static final Logger log = 
LoggerFactory.getLogger(DistributedWorkQueue.class);
  
 -  private ThreadPoolExecutor threadPool;
    private final ZooReaderWriter zoo;
    private final String path;
-   private final ServerContext context;
+   private final AbstractServer server;
    private final long timerInitialDelay;
    private final long timerPeriod;
  
@@@ -192,19 -190,17 +198,23 @@@
    }
  
    public ServerContext getContext() {
-     return context;
+     return server.getContext();
+   }
+ 
+   public AbstractServer getServer() {
+     return server;
    }
  
 -  public void startProcessing(final Processor processor, ThreadPoolExecutor 
executorService)
 -      throws KeeperException, InterruptedException {
 +  public long getCheckInterval() {
 +    return this.timerPeriod;
 +  }
  
 -    threadPool = executorService;
 +  /**
 +   * Finds the children at the path passed in the constructor and calls 
{@code lookForWork} which
 +   * will attempt to process all of the currently available work
 +   */
 +  public void processExistingWork(final Processor processor, ExecutorService 
executor,
 +      final int maxThreads, boolean setWatch) throws KeeperException, 
InterruptedException {
  
      zoo.mkdirs(path);
      zoo.mkdirs(path + "/" + LOCKS_NODE);
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index e61ab5a112,eb926da7cf..a954b687bc
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -274,13 -251,26 +271,13 @@@ public class Compactor extends Abstract
    protected void announceExistence(HostAndPort clientAddress)
        throws KeeperException, InterruptedException {
  
 -    String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress);
 -
 -    ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
 -    String compactorQueuePath =
 -        getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + 
this.queueName;
 -    String zPath = compactorQueuePath + "/" + hostPort;
 -
 -    try {
 -      zoo.mkdirs(compactorQueuePath);
 -      zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
 -    } catch (KeeperException.NoAuthException e) {
 -      LOG.error("Failed to write to ZooKeeper. Ensure that"
 -          + " accumulo.properties, specifically instance.secret, is 
consistent.");
 -      throw e;
 -    }
 -
 -    compactorLock =
 -        new ServiceLock(getContext().getZooSession(), 
ServiceLock.path(zPath), compactorId);
 -    LockWatcher lw = new ServiceLockWatcher("compactor", () -> 
getShutdownComplete().get(),
 -        (name) -> 
getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
 +    final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
 +    final ServiceLockPath path =
 +        getContext().getServerPaths().createCompactorPath(getResourceGroup(), 
clientAddress);
 +    ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path);
 +    compactorLock = new ServiceLock(getContext().getZooSession(), path, 
compactorId);
-     LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false,
++    LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> 
getShutdownComplete().get(),
 +        (type) -> 
getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
  
      try {
        for (int i = 0; i < 25; i++) {
@@@ -320,10 -305,10 +317,10 @@@
     * @throws UnknownHostException host unknown
     */
    protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
 -    ClientServiceHandler clientHandler =
 -        new ClientServiceHandler(getContext(), new 
TransactionWatcher(getContext()));
 -    var processor =
 -        ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, 
this, getContext());
 +
 +    ClientServiceHandler clientHandler = new 
ClientServiceHandler(getContext());
-     var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler,
++    var processor = ThriftProcessorTypes.getCompactorTProcessor(this, 
clientHandler,
 +        getCompactorThriftHandlerInterface(), getContext());
      ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
          Property.COMPACTOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
          "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, 
Property.COMPACTOR_MINTHREADS,
@@@ -697,189 -682,184 +694,194 @@@
      try {
  
        final AtomicReference<Throwable> err = new AtomicReference<>();
-       final LogSorter logSorter = new LogSorter(getContext(), 
getConfiguration());
++      final LogSorter logSorter = new LogSorter(this);
 +      long nextSortLogsCheckTime = System.currentTimeMillis();
  
-       while (!shutdown) {
- 
-         // mark compactor as idle while not in the compaction loop
-         updateIdleStatus(true);
- 
-         currentCompactionId.set(null);
-         err.set(null);
-         JOB_HOLDER.reset();
- 
-         if (System.currentTimeMillis() > nextSortLogsCheckTime) {
-           // Attempt to process all existing log sorting work serially in 
this thread.
-           // When no work remains, this call will return so that we can look 
for compaction
-           // work.
-           LOG.debug("Checking to see if any recovery logs need sorting");
-           nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+       while (!isShutdownRequested()) {
+         if (Thread.currentThread().isInterrupted()) {
+           LOG.info("Server process thread has been interrupted, shutting 
down");
+           break;
          }
- 
-         TExternalCompactionJob job;
          try {
-           TNextCompactionJob next = getNextJob(getNextId());
-           job = next.getJob();
-           if (!job.isSetExternalCompactionId()) {
-             LOG.trace("No external compactions in group {}", 
this.getResourceGroup());
-             
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
-             continue;
+           // mark compactor as idle while not in the compaction loop
+           updateIdleStatus(true);
+ 
+           currentCompactionId.set(null);
+           err.set(null);
+           JOB_HOLDER.reset();
+ 
++          if (System.currentTimeMillis() > nextSortLogsCheckTime) {
++            // Attempt to process all existing log sorting work serially in 
this thread.
++            // When no work remains, this call will return so that we can 
look for compaction
++            // work.
++            LOG.debug("Checking to see if any recovery logs need sorting");
++            nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
 +          }
-           if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
-             throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
-                 + " does not match supplied eci " + 
currentCompactionId.get());
++
+           TExternalCompactionJob job;
+           try {
+             TNextCompactionJob next = getNextJob(getNextId());
+             job = next.getJob();
+             if (!job.isSetExternalCompactionId()) {
 -              LOG.trace("No external compactions in queue {}", 
this.queueName);
++              LOG.trace("No external compactions in queue {}", 
this.getResourceGroup());
+               
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
+               continue;
+             }
+             if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
+               throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
+                   + " does not match supplied eci " + 
currentCompactionId.get());
+             }
+           } catch (RetriesExceededException e2) {
+             LOG.warn("Retries exceeded getting next job. Retrying...");
+             continue;
            }
-         } catch (RetriesExceededException e2) {
-           LOG.warn("Retries exceeded getting next job. Retrying...");
-           continue;
-         }
-         LOG.debug("Received next compaction job: {}", job);
+           LOG.debug("Received next compaction job: {}", job);
  
-         final LongAdder totalInputEntries = new LongAdder();
-         final LongAdder totalInputBytes = new LongAdder();
-         final CountDownLatch started = new CountDownLatch(1);
-         final CountDownLatch stopped = new CountDownLatch(1);
+           final LongAdder totalInputEntries = new LongAdder();
+           final LongAdder totalInputBytes = new LongAdder();
+           final CountDownLatch started = new CountDownLatch(1);
+           final CountDownLatch stopped = new CountDownLatch(1);
  
-         final FileCompactorRunnable fcr =
-             createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
+           final FileCompactorRunnable fcr =
+               createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
  
-         final Thread compactionThread =
-             Threads.createThread("Compaction job for tablet " + 
job.getExtent().toString(), fcr);
+           final Thread compactionThread =
+               Threads.createThread("Compaction job for tablet " + 
job.getExtent().toString(), fcr);
  
-         JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
+           JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
  
-         try {
-           // mark compactor as busy while compacting
-           updateIdleStatus(false);
- 
-           // Need to call FileCompactorRunnable.initialize after calling 
JOB_HOLDER.set
-           fcr.initialize();
- 
-           compactionThread.start(); // start the compactionThread
-           started.await(); // wait until the compactor is started
-           final long inputEntries = totalInputEntries.sum();
-           final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
-           LOG.debug("Progress checks will occur every {} seconds", waitTime);
-           String percentComplete = "unknown";
- 
-           while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
-             List<CompactionInfo> running =
-                 
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
-             if (!running.isEmpty()) {
-               // Compaction has started. There should only be one in the list
-               CompactionInfo info = running.get(0);
-               if (info != null) {
-                 final long entriesRead = info.getEntriesRead();
-                 final long entriesWritten = info.getEntriesWritten();
-                 if (inputEntries > 0) {
-                   percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
-                 }
-                 String message = String.format(
-                     "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries, paused %d times",
-                     entriesRead, inputEntries, percentComplete, "%", 
entriesWritten,
-                     info.getTimesPaused());
-                 watcher.run();
-                 try {
-                   LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
-                   TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(
-                       TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
-                       entriesWritten, fcr.getCompactionAge().toNanos());
-                   updateCompactionState(job, update);
-                 } catch (RetriesExceededException e) {
-                   LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
-                       e.getMessage());
+           try {
+             // mark compactor as busy while compacting
+             updateIdleStatus(false);
+ 
+             // Need to call FileCompactorRunnable.initialize after calling 
JOB_HOLDER.set
+             fcr.initialize();
+ 
+             compactionThread.start(); // start the compactionThread
+             started.await(); // wait until the compactor is started
+             final long inputEntries = totalInputEntries.sum();
+             final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
+             LOG.debug("Progress checks will occur every {} seconds", 
waitTime);
+             String percentComplete = "unknown";
+ 
+             while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
+               List<CompactionInfo> running =
+                   
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
+               if (!running.isEmpty()) {
+                 // Compaction has started. There should only be one in the 
list
+                 CompactionInfo info = running.get(0);
+                 if (info != null) {
+                   final long entriesRead = info.getEntriesRead();
+                   final long entriesWritten = info.getEntriesWritten();
+                   if (inputEntries > 0) {
+                     percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
+                   }
+                   String message = String.format(
+                       "Compaction in progress, read %d of %d input entries ( 
%s %s ), written %d entries",
+                       entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
+                   watcher.run();
+                   try {
+                     LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
+                     TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(
+                         TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
+                         entriesWritten, fcr.getCompactionAge().toNanos());
+                     updateCompactionState(job, update);
+                   } catch (RetriesExceededException e) {
+                     LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
+                         e.getMessage());
+                   }
                  }
+               } else {
+                 LOG.debug("Waiting on compaction thread to finish, but no 
RUNNING compaction");
                }
-             } else {
-               LOG.debug("Waiting on compaction thread to finish, but no 
RUNNING compaction");
              }
-           }
-           compactionThread.join();
-           LOG.trace("Compaction thread finished.");
-           // Run the watcher again to clear out the finished compaction and 
set the
-           // stuck count to zero.
-           watcher.run();
- 
-           if (err.get() != null) {
-             // maybe the error occured because the table was deleted or 
something like that, so
-             // force a cancel check to possibly reduce noise in the logs
-             checkIfCanceled();
-           }
- 
-           if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
-               || (err.get() != null && 
err.get().getClass().equals(InterruptedException.class))) {
-             LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
-             try {
-               TCompactionStatusUpdate update =
-                   new TCompactionStatusUpdate(TCompactionState.CANCELLED, 
"Compaction cancelled",
-                       -1, -1, -1, fcr.getCompactionAge().toNanos());
-               updateCompactionState(job, update);
-               updateCompactionFailed(job);
-             } catch (RetriesExceededException e) {
-               LOG.error("Error updating coordinator with compaction 
cancellation.", e);
-             } finally {
-               currentCompactionId.set(null);
+             compactionThread.join();
+             LOG.trace("Compaction thread finished.");
+             // Run the watcher again to clear out the finished compaction and 
set the
+             // stuck count to zero.
+             watcher.run();
+ 
+             if (err.get() != null) {
+               // maybe the error occured because the table was deleted or 
something like that, so
+               // force a cancel check to possibly reduce noise in the logs
+               checkIfCanceled();
              }
-           } else if (err.get() != null) {
-             KeyExtent fromThriftExtent = 
KeyExtent.fromThrift(job.getExtent());
-             try {
-               LOG.info("Updating coordinator with compaction failure: id: {}, 
extent: {}",
-                   job.getExternalCompactionId(), fromThriftExtent);
-               TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(TCompactionState.FAILED,
-                   "Compaction failed due to: " + err.get().getMessage(), -1, 
-1, -1,
-                   fcr.getCompactionAge().toNanos());
-               updateCompactionState(job, update);
-               updateCompactionFailed(job);
-             } catch (RetriesExceededException e) {
-               LOG.error("Error updating coordinator with compaction failure: 
id: {}, extent: {}",
-                   job.getExternalCompactionId(), fromThriftExtent, e);
-             } finally {
-               currentCompactionId.set(null);
-             }
-           } else {
-             try {
-               LOG.trace("Updating coordinator with compaction completion.");
-               updateCompactionCompleted(job, JOB_HOLDER.getStats());
-             } catch (RetriesExceededException e) {
-               LOG.error(
-                   "Error updating coordinator with compaction completion, 
cancelling compaction.",
-                   e);
+ 
+             if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
+                 || (err.get() != null && 
err.get().getClass().equals(InterruptedException.class))) {
+               LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
+               try {
+                 TCompactionStatusUpdate update =
+                     new TCompactionStatusUpdate(TCompactionState.CANCELLED, 
"Compaction cancelled",
+                         -1, -1, -1, fcr.getCompactionAge().toNanos());
+                 updateCompactionState(job, update);
+                 updateCompactionFailed(job);
+               } catch (RetriesExceededException e) {
+                 LOG.error("Error updating coordinator with compaction 
cancellation.", e);
+               } finally {
+                 currentCompactionId.set(null);
+               }
+             } else if (err.get() != null) {
+               KeyExtent fromThriftExtent = 
KeyExtent.fromThrift(job.getExtent());
+               try {
+                 LOG.info("Updating coordinator with compaction failure: id: 
{}, extent: {}",
+                     job.getExternalCompactionId(), fromThriftExtent);
+                 TCompactionStatusUpdate update = new TCompactionStatusUpdate(
+                     TCompactionState.FAILED, "Compaction failed due to: " + 
err.get().getMessage(),
+                     -1, -1, -1, fcr.getCompactionAge().toNanos());
+                 updateCompactionState(job, update);
+                 updateCompactionFailed(job);
+               } catch (RetriesExceededException e) {
+                 LOG.error("Error updating coordinator with compaction 
failure: id: {}, extent: {}",
+                     job.getExternalCompactionId(), fromThriftExtent, e);
+               } finally {
+                 currentCompactionId.set(null);
+               }
+             } else {
                try {
-                 cancel(job.getExternalCompactionId());
-               } catch (TException e1) {
-                 LOG.error("Error cancelling compaction.", e1);
+                 LOG.trace("Updating coordinator with compaction completion.");
+                 updateCompactionCompleted(job, JOB_HOLDER.getStats());
+               } catch (RetriesExceededException e) {
+                 LOG.error(
+                     "Error updating coordinator with compaction completion, 
cancelling compaction.",
+                     e);
+                 try {
+                   cancel(job.getExternalCompactionId());
+                 } catch (TException e1) {
+                   LOG.error("Error cancelling compaction.", e1);
+                 }
+               } finally {
+                 currentCompactionId.set(null);
                }
-             } finally {
-               currentCompactionId.set(null);
              }
-           }
-         } catch (RuntimeException e1) {
-           LOG.error(
-               "Compactor thread was interrupted waiting for compaction to 
start, cancelling job",
-               e1);
-           try {
-             cancel(job.getExternalCompactionId());
-           } catch (TException e2) {
-             LOG.error("Error cancelling compaction.", e2);
-           }
-         } finally {
-           currentCompactionId.set(null);
+           } catch (RuntimeException e1) {
+             LOG.error(
+                 "Compactor thread was interrupted waiting for compaction to 
start, cancelling job",
+                 e1);
+             try {
+               cancel(job.getExternalCompactionId());
+             } catch (TException e2) {
+               LOG.error("Error cancelling compaction.", e2);
+             }
+           } finally {
+             currentCompactionId.set(null);
  
-           // mark compactor as idle after compaction completes
-           updateIdleStatus(true);
+             // mark compactor as idle after compaction completes
+             updateIdleStatus(true);
  
-           // In the case where there is an error in the foreground code the 
background compaction
-           // may still be running. Must cancel it before starting another 
iteration of the loop to
-           // avoid multiple threads updating shared state.
-           while (compactionThread.isAlive()) {
-             compactionThread.interrupt();
-             compactionThread.join(1000);
+             // In the case where there is an error in the foreground code the 
background compaction
+             // may still be running. Must cancel it before starting another 
iteration of the loop to
+             // avoid multiple threads updating shared state.
+             while (compactionThread.isAlive()) {
+               compactionThread.interrupt();
+               compactionThread.join(1000);
+             }
            }
+         } catch (InterruptedException e) {
+           LOG.info("Interrupt Exception received, shutting down");
+           gracefulShutdown(getContext().rpcCreds());
          }
- 
-       }
- 
+       } // end while
      } catch (Exception e) {
        LOG.error("Unhandled error occurred in Compactor", e);
      } finally {
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 62449eabb6,a9446397ed..4004326622
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -195,143 -181,140 +195,159 @@@ public class SimpleGarbageCollector ext
            }
          });
  
-     while (true) {
-       Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc");
-       try (Scope outerScope = outerSpan.makeCurrent()) {
-         Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop");
-         try (Scope innerScope = innerSpan.makeCurrent()) {
-           final long tStart = System.nanoTime();
-           try {
-             System.gc(); // make room
+     while (!isShutdownRequested()) {
+       if (Thread.currentThread().isInterrupted()) {
 -        LOG.info("Server process thread has been interrupted, shutting down");
++        log.info("Server process thread has been interrupted, shutting down");
+         break;
+       }
+       try {
+         Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc");
+         try (Scope outerScope = outerSpan.makeCurrent()) {
+           Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop");
+           try (Scope innerScope = innerSpan.makeCurrent()) {
+             final long tStart = System.nanoTime();
+             try {
+               System.gc(); // make room
+ 
+               status.current.started = System.currentTimeMillis();
+               var rootGC = new GCRun(DataLevel.ROOT, getContext());
+               var mdGC = new GCRun(DataLevel.METADATA, getContext());
+               var userGC = new GCRun(DataLevel.USER, getContext());
+ 
+               log.info("Starting Root table Garbage Collection.");
+               status.current.bulks += new 
GarbageCollectionAlgorithm().collect(rootGC);
+               incrementStatsForRun(rootGC);
+               logStats();
+ 
+               log.info("Starting Metadata table Garbage Collection.");
+               status.current.bulks += new 
GarbageCollectionAlgorithm().collect(mdGC);
+               incrementStatsForRun(mdGC);
+               logStats();
+ 
+               log.info("Starting User table Garbage Collection.");
+               status.current.bulks += new 
GarbageCollectionAlgorithm().collect(userGC);
+               incrementStatsForRun(userGC);
+               logStats();
+ 
+             } catch (Exception e) {
+               TraceUtil.setException(innerSpan, e, false);
+               log.error("{}", e.getMessage(), e);
+             } finally {
+               status.current.finished = System.currentTimeMillis();
+               status.last = status.current;
+               gcCycleMetrics.setLastCollect(status.current);
+               status.current = new GcCycleStats();
+             }
  
-             status.current.started = System.currentTimeMillis();
-             var rootGC = new GCRun(DataLevel.ROOT, getContext());
-             var mdGC = new GCRun(DataLevel.METADATA, getContext());
-             var userGC = new GCRun(DataLevel.USER, getContext());
+             final long tStop = System.nanoTime();
+             log.info(String.format("Collect cycle took %.2f seconds",
+                 (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));
+ 
+             // Clean up any unused write-ahead logs
+             Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs");
+             try (Scope walScope = walSpan.makeCurrent()) {
+               GarbageCollectWriteAheadLogs walogCollector =
+                   new GarbageCollectWriteAheadLogs(getContext(), fs, 
liveTServerSet);
+               log.info("Beginning garbage collection of write-ahead logs");
+               walogCollector.collect(status);
+               gcCycleMetrics.setLastWalCollect(status.lastLog);
+             } catch (Exception e) {
+               TraceUtil.setException(walSpan, e, false);
+               log.error("{}", e.getMessage(), e);
+             } finally {
+               walSpan.end();
+             }
+           } catch (Exception e) {
+             TraceUtil.setException(innerSpan, e, true);
+             throw e;
+           } finally {
+             innerSpan.end();
+           }
  
-             log.info("Starting Root table Garbage Collection.");
-             status.current.bulks += new 
GarbageCollectionAlgorithm().collect(rootGC);
-             incrementStatsForRun(rootGC);
-             logStats();
+           // we just made a lot of metadata changes: flush them out
+           try {
+             AccumuloClient accumuloClient = getContext();
+ 
+             final long actionStart = System.nanoTime();
+ 
+             String action = 
getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
+             log.debug("gc post action {} started", action);
+ 
+             switch (action) {
+               case "compact":
+                 
accumuloClient.tableOperations().compact(AccumuloTable.METADATA.name(), null, 
null,
+                     true, true);
+                 
accumuloClient.tableOperations().compact(AccumuloTable.ROOT.name(), null, null,
+                     true, true);
+                 break;
+               case "flush":
+                 
accumuloClient.tableOperations().flush(AccumuloTable.METADATA.name(), null, 
null,
+                     true);
+                 
accumuloClient.tableOperations().flush(AccumuloTable.ROOT.name(), null, null, 
true);
+                 break;
+               default:
+                 log.trace("'none - no action' or invalid value provided: {}", 
action);
+             }
  
-             log.info("Starting Metadata table Garbage Collection.");
-             status.current.bulks += new 
GarbageCollectionAlgorithm().collect(mdGC);
-             incrementStatsForRun(mdGC);
-             logStats();
+             final long actionComplete = System.nanoTime();
  
-             log.info("Starting User table Garbage Collection.");
-             status.current.bulks += new 
GarbageCollectionAlgorithm().collect(userGC);
-             incrementStatsForRun(userGC);
-             logStats();
+             gcCycleMetrics.setPostOpDurationNanos(actionComplete - 
actionStart);
  
-           } catch (Exception e) {
-             TraceUtil.setException(innerSpan, e, false);
-             log.error("{}", e.getMessage(), e);
-           } finally {
-             status.current.finished = System.currentTimeMillis();
-             status.last = status.current;
-             gcCycleMetrics.setLastCollect(status.current);
-             status.current = new GcCycleStats();
-           }
+             log.info("gc post action {} completed in {} seconds", action, 
String.format("%.2f",
+                 (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) 
/ 1000.0)));
  
-           final long tStop = System.nanoTime();
-           log.info(String.format("Collect cycle took %.2f seconds",
-               (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));
- 
-           // Clean up any unused write-ahead logs
-           Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs");
-           try (Scope walScope = walSpan.makeCurrent()) {
-             GarbageCollectWriteAheadLogs walogCollector =
-                 new GarbageCollectWriteAheadLogs(getContext(), fs, 
liveTServerSet);
-             log.info("Beginning garbage collection of write-ahead logs");
-             walogCollector.collect(status);
-             gcCycleMetrics.setLastWalCollect(status.lastLog);
            } catch (Exception e) {
-             TraceUtil.setException(walSpan, e, false);
-             log.error("{}", e.getMessage(), e);
-           } finally {
-             walSpan.end();
+             TraceUtil.setException(outerSpan, e, false);
+             log.warn("{}", e.getMessage(), e);
            }
          } catch (Exception e) {
-           TraceUtil.setException(innerSpan, e, true);
+           TraceUtil.setException(outerSpan, e, true);
            throw e;
          } finally {
-           innerSpan.end();
+           outerSpan.end();
          }
- 
-         // we just made a lot of metadata changes: flush them out
          try {
-           AccumuloClient accumuloClient = getContext();
- 
-           final long actionStart = System.nanoTime();
- 
-           String action = 
getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
-           log.debug("gc post action {} started", action);
- 
-           switch (action) {
-             case "compact":
-               
accumuloClient.tableOperations().compact(AccumuloTable.METADATA.tableName(), 
null,
-                   null, true, true);
-               
accumuloClient.tableOperations().compact(AccumuloTable.ROOT.tableName(), null, 
null,
-                   true, true);
-               break;
-             case "flush":
-               
accumuloClient.tableOperations().flush(AccumuloTable.METADATA.tableName(), 
null, null,
-                   true);
-               
accumuloClient.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, 
null,
-                   true);
-               break;
-             default:
-               log.trace("'none - no action' or invalid value provided: {}", 
action);
-           }
  
-           final long actionComplete = System.nanoTime();
- 
-           gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart);
- 
-           log.info("gc post action {} completed in {} seconds", action, 
String.format("%.2f",
-               (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 
1000.0)));
- 
-         } catch (Exception e) {
-           TraceUtil.setException(outerSpan, e, false);
-           log.warn("{}", e.getMessage(), e);
-         }
-       } catch (Exception e) {
-         TraceUtil.setException(outerSpan, e, true);
-         throw e;
-       } finally {
-         outerSpan.end();
-       }
-       try {
- 
-         gcCycleMetrics.incrementRunCycleCount();
-         long gcDelay = 
getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
- 
-         if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) {
-           Map<String,Set<TableId>> resourceMapping = new HashMap<>();
-           for (TableId tid : AccumuloTable.allTableIds()) {
-             TableConfiguration tconf = 
getContext().getTableConfiguration(tid);
-             String resourceGroup = 
tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY);
-             resourceGroup =
-                 resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME 
: resourceGroup;
-             resourceMapping.getOrDefault(resourceGroup, new 
HashSet<>()).add(tid);
-           }
-           for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) {
-             if (ExternalCompactionUtil.countCompactors(e.getKey(), 
getContext()) == 0) {
-               log.warn("No Compactors exist in resource group {} for system 
table {}", e.getKey(),
-                   e.getValue());
+           gcCycleMetrics.incrementRunCycleCount();
+           long gcDelay = 
getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
++
++          if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) {
++            Map<String,Set<TableId>> resourceMapping = new HashMap<>();
++            for (TableId tid : AccumuloTable.allTableIds()) {
++              TableConfiguration tconf = 
getContext().getTableConfiguration(tid);
++              String resourceGroup = 
tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY);
++              resourceGroup =
++                  resourceGroup == null ? 
Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup;
++              resourceMapping.getOrDefault(resourceGroup, new 
HashSet<>()).add(tid);
++            }
++            for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) {
++              if (ExternalCompactionUtil.countCompactors(e.getKey(), 
getContext()) == 0) {
++                log.warn("No Compactors exist in resource group {} for system 
table {}", e.getKey(),
++                    e.getValue());
++              }
 +            }
++            lastCompactorCheck.restart();
 +          }
-           lastCompactorCheck.restart();
-         }
 +
-         log.debug("Sleeping for {} milliseconds", gcDelay);
-         Thread.sleep(gcDelay);
+           log.debug("Sleeping for {} milliseconds", gcDelay);
+           Thread.sleep(gcDelay);
+         } catch (InterruptedException e) {
+           log.warn("{}", e.getMessage(), e);
+           throw e;
+         }
        } catch (InterruptedException e) {
-         log.warn("{}", e.getMessage(), e);
-         return;
+         log.info("Interrupt Exception received, shutting down");
+         gracefulShutdown(getContext().rpcCreds());
        }
      }
+     getShutdownComplete().set(true);
+     log.info("stop requested. exiting ... ");
+     try {
+       gcLock.unlock();
+     } catch (Exception e) {
+       log.warn("Failed to release GarbageCollector lock", e);
+     }
+ 
    }
  
    private void incrementStatsForRun(GCRun gcRun) {
@@@ -371,11 -354,12 +387,12 @@@
  
      UUID zooLockUUID = UUID.randomUUID();
      gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID);
-     HAServiceLockWatcher gcLockWatcher = new 
HAServiceLockWatcher(Type.GARBAGE_COLLECTOR);
+     HAServiceLockWatcher gcLockWatcher =
 -        new HAServiceLockWatcher("gc", () -> getShutdownComplete().get());
++        new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR, () -> 
getShutdownComplete().get());
  
      while (true) {
 -      gcLock.lock(gcLockWatcher,
 -          new ServiceLockData(zooLockUUID, addr.toString(), 
ThriftService.GC));
 +      gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID, 
addr.toString(), ThriftService.GC,
 +          this.getResourceGroup()));
  
        gcLockWatcher.waitForChange();
  
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index eb22227bb9,1fa3de8c88..48a049fcf9
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1136,8 -1243,8 +1136,8 @@@ public class Manager extends AbstractSe
          HighlyAvailableServiceWrapper.service(managerClientHandler, this);
  
      ServerAddress sa;
-     var processor = 
ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler,
 -    var processor =
 -        ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, 
haProxy, getContext());
++    var processor = ThriftProcessorTypes.getManagerTProcessor(this, 
fateServiceHandler,
 +        compactionCoordinator.getThriftService(), haProxy, getContext());
  
      try {
        sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
@@@ -1335,14 -1423,25 +1335,27 @@@
      // The manager is fully initialized. Clients are allowed to connect now.
      managerInitialized.set(true);
  
-     while (clientService.isServing()) {
-       sleepUninterruptibly(500, MILLISECONDS);
+     while (!isShutdownRequested() && clientService.isServing()) {
+       if (Thread.currentThread().isInterrupted()) {
 -        LOG.info("Server process thread has been interrupted, shutting down");
++        log.info("Server process thread has been interrupted, shutting down");
+         break;
+       }
+       try {
+         Thread.sleep(500);
+       } catch (InterruptedException e) {
+         log.info("Interrupt Exception received, shutting down");
+         gracefulShutdown(context.rpcCreds());
+       }
      }
-     log.info("Shutting down fate.");
+ 
 -    LOG.debug("Stopping Thrift Servers");
 -    sa.server.stop();
 -
+     log.debug("Shutting down fate.");
 -    fate().shutdown();
 +    getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));
 +
 +    splitter.stop();
 +
++    log.debug("Stopping Thrift Servers");
++    sa.server.stop();
+ 
      final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
      try {
        statusThread.join(remaining(deadline));
@@@ -1376,19 -1472,18 +1389,25 @@@
          throw new IllegalStateException("Exception waiting on watcher", e);
        }
      }
-     log.info("exiting");
+     getShutdownComplete().set(true);
+     log.info("stop requested. exiting ... ");
+     try {
+       managerLock.unlock();
+     } catch (Exception e) {
+       log.warn("Failed to release Manager lock", e);
+     }
    }
  
 -  protected Fate<Manager> initializeFateInstance(TStore<Manager> store,
 -      AccumuloConfiguration conf) {
 -    return new Fate<>(this, store, TraceRepo::toLogString, conf);
 +  protected Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
 +
 +    final Fate<Manager> fateInstance =
 +        new Fate<>(this, store, true, TraceRepo::toLogString, 
getConfiguration());
 +
 +    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
this::getSteadyTime);
 +    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 +        .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
 +
 +    return fateInstance;
    }
  
    /**
@@@ -1496,13 -1591,12 +1515,14 @@@
          getHostname() + ":" + 
getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0];
  
      UUID zooLockUUID = UUID.randomUUID();
 -    ServiceLockData sld =
 -        new ServiceLockData(zooLockUUID, managerClientAddress, 
ThriftService.MANAGER);
  
 +    ServiceDescriptors descriptors = new ServiceDescriptors();
 +    descriptors.addService(new ServiceDescriptor(zooLockUUID, 
ThriftService.MANAGER,
 +        managerClientAddress, this.getResourceGroup()));
 +    ServiceLockData sld = new ServiceLockData(descriptors);
      managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
-     HAServiceLockWatcher managerLockWatcher = new 
HAServiceLockWatcher(Type.MANAGER);
+     HAServiceLockWatcher managerLockWatcher =
 -        new HAServiceLockWatcher("manager", () -> 
getShutdownComplete().get());
++        new HAServiceLockWatcher(Type.MANAGER, () -> 
getShutdownComplete().get());
  
      while (true) {
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index 2bf22a2841,1222ceba87..94ddc573f1
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@@ -331,6 -327,40 +332,17 @@@ public class ManagerClientServiceHandle
      log.debug("FATE op shutting down " + tabletServer + " finished");
    }
  
+   @Override
+   public void tabletServerStopping(TInfo tinfo, TCredentials credentials, 
String tabletServer)
+       throws ThriftSecurityException, ThriftNotActiveServiceException, 
TException {
+     if (!manager.security.canPerformSystemActions(credentials)) {
+       throw new ThriftSecurityException(credentials.getPrincipal(),
+           SecurityErrorCode.PERMISSION_DENIED);
+     }
+     log.info("Tablet Server {} has reported it's shutting down", 
tabletServer);
+     manager.tserverSet.tabletServerShuttingDown(tabletServer);
+   }
+ 
 -  @Override
 -  public void reportSplitExtent(TInfo info, TCredentials credentials, String 
serverName,
 -      TabletSplit split) throws ThriftSecurityException {
 -    if (!manager.security.canPerformSystemActions(credentials)) {
 -      throw new ThriftSecurityException(credentials.getPrincipal(),
 -          SecurityErrorCode.PERMISSION_DENIED);
 -    }
 -
 -    KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet);
 -    if (manager.migrations.remove(oldTablet) != null) {
 -      Manager.log.info("Canceled migration of {}", split.oldTablet);
 -    }
 -    for (TServerInstance instance : manager.tserverSet.getCurrentServers()) {
 -      if (serverName.equals(instance.getHostPort())) {
 -        manager.nextEvent.event("%s reported split %s, %s", serverName,
 -            KeyExtent.fromThrift(split.newTablets.get(0)),
 -            KeyExtent.fromThrift(split.newTablets.get(1)));
 -        return;
 -      }
 -    }
 -    Manager.log.warn("Got a split from a server we don't recognize: {}", 
serverName);
 -  }
 -
    @Override
    public void reportTabletStatus(TInfo info, TCredentials credentials, String 
serverName,
        TabletLoadState status, TKeyExtent ttablet) throws 
ThriftSecurityException {
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index ac142dc671,b2e60a04bc..4bcb06a552
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -423,6 -422,22 +423,22 @@@ public class Monitor extends AbstractSe
      }).start();
  
      monitorInitialized.set(true);
+ 
+     while (!isShutdownRequested()) {
+       if (Thread.currentThread().isInterrupted()) {
 -        LOG.info("Server process thread has been interrupted, shutting down");
++        log.info("Server process thread has been interrupted, shutting down");
+         break;
+       }
+       try {
+         Thread.sleep(1000);
+       } catch (InterruptedException e) {
 -        LOG.info("Interrupt Exception received, shutting down");
++        log.info("Interrupt Exception received, shutting down");
+         gracefulShutdown(context.rpcCreds());
+       }
+     }
+ 
+     server.stop();
+     log.info("stop requested. exiting ... ");
    }
  
    private ServletHolder getDefaultServlet() {
@@@ -718,13 -737,12 +734,14 @@@
      // Get a ZooLock for the monitor
      UUID zooLockUUID = UUID.randomUUID();
      monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, 
zooLockUUID);
-     HAServiceLockWatcher monitorLockWatcher = new 
HAServiceLockWatcher(Type.MONITOR);
+     HAServiceLockWatcher monitorLockWatcher =
 -        new HAServiceLockWatcher("monitor", () -> isShutdownRequested());
++        new HAServiceLockWatcher(Type.MONITOR, () -> isShutdownRequested());
  
      while (true) {
 -      monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID,
 -          monitorLocation.getHost() + ":" + monitorLocation.getPort(), 
ThriftService.NONE));
 +      monitorLock.lock(monitorLockWatcher,
 +          new ServiceLockData(zooLockUUID,
 +              monitorLocation.getHost() + ":" + monitorLocation.getPort(), 
ThriftService.NONE,
 +              this.getResourceGroup()));
  
        monitorLockWatcher.waitForChange();
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 8a409d566b,2ebaab6fe3..1a00514f7f
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@@ -303,9 -301,10 +302,9 @@@ public class ScanServer extends Abstrac
  
      // This class implements TabletClientService.Iface and then delegates 
calls. Be sure
      // to set up the ThriftProcessor using this class, not the delegate.
 -    ClientServiceHandler clientHandler =
 -        new ClientServiceHandler(context, new TransactionWatcher(context));
 +    ClientServiceHandler clientHandler = new ClientServiceHandler(context);
      TProcessor processor =
-         ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, 
getContext());
+         ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, 
this, getContext());
  
      ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
          Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
@@@ -327,16 -326,27 +326,16 @@@
     * Set up nodes and locks in ZooKeeper for this Compactor
     */
    private ServiceLock announceExistence() {
 -    ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
 +    final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
      try {
  
 -      var zLockPath = ServiceLock.path(
 -          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + 
getClientAddressString());
 -
 -      try {
 -        // Old zk nodes can be cleaned up by ZooZap
 -        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, 
NodeExistsPolicy.SKIP);
 -      } catch (KeeperException e) {
 -        if (e.code() == KeeperException.Code.NOAUTH) {
 -          LOG.error("Failed to write to ZooKeeper. Ensure that"
 -              + " accumulo.properties, specifically instance.secret, is 
consistent.");
 -        }
 -        throw e;
 -      }
 -
 +      final ServiceLockPath zLockPath =
 +          context.getServerPaths().createScanServerPath(getResourceGroup(), 
clientAddress);
 +      ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, 
zLockPath);
        serverLockUUID = UUID.randomUUID();
        scanServerLock = new ServiceLock(getContext().getZooSession(), 
zLockPath, serverLockUUID);
-       LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> 
serverStopRequested,
 -      LockWatcher lw = new ServiceLockWatcher("scan server", () -> 
getShutdownComplete().get(),
 -          (name) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
++      LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> 
getShutdownComplete().get(),
 +          (type) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
  
        for (int i = 0; i < 120 / 5; i++) {
          zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
@@@ -390,37 -400,40 +389,57 @@@
      // We need to set the compaction manager so that we don't get an NPE in 
CompactableImpl.close
  
      ServiceLock lock = announceExistence();
 +    this.getContext().setServiceLock(lock);
 +
 +    int threadPoolSize = 
getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
 +    if (threadPoolSize > 0) {
-       final LogSorter logSorter = new LogSorter(context, getConfiguration());
++      final LogSorter logSorter = new LogSorter(this);
 +      try {
 +        // Attempt to process all existing log sorting work and start a 
background
 +        // thread to look for log sorting work in the future
-         logSorter.startWatchingForRecoveryLogs(threadPoolSize);
++        logSorter.startWatchingForRecoveryLogs();
 +      } catch (Exception ex) {
 +        LOG.error("Error starting LogSorter");
 +        throw new RuntimeException(ex);
 +      }
 +    } else {
 +      LOG.info(
 +          "Log sorting for tablet recovery is disabled, 
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
 +    }
  
      try {
-       while (!serverStopRequested) {
-         UtilWaitThread.sleep(1000);
-         updateIdleStatus(
-             sessionManager.getActiveScans().isEmpty() && 
tabletMetadataCache.estimatedSize() == 0);
+       while (!isShutdownRequested()) {
+         if (Thread.currentThread().isInterrupted()) {
+           LOG.info("Server process thread has been interrupted, shutting 
down");
+           break;
+         }
+         try {
+           Thread.sleep(1000);
+           updateIdleStatus(sessionManager.getActiveScans().isEmpty()
+               && tabletMetadataCache.estimatedSize() == 0);
+         } catch (InterruptedException e) {
+           LOG.info("Interrupt Exception received, shutting down");
+           gracefulShutdown(getContext().rpcCreds());
+         }
        }
      } finally {
-       LOG.info("Stopping Thrift Servers");
+       // Wait for scans to got to zero
+       while (!sessionManager.getActiveScans().isEmpty()) {
+         LOG.debug("Waiting on {} active scans to complete.",
+             sessionManager.getActiveScans().size());
+         UtilWaitThread.sleep(1000);
+       }
+ 
+       LOG.debug("Stopping Thrift Servers");
        address.server.stop();
  
-       LOG.info("Removing server scan references");
-       
this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(),
-           serverLockUUID);
+       try {
+         LOG.info("Removing server scan references");
+         
this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(),
+             serverLockUUID);
+       } catch (Exception e) {
+         LOG.warn("Failed to remove scan server refs from metadata location", 
e);
+       }
  
        try {
          LOG.debug("Closing filesystems");
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index aea44fcdaf,333e35d542..4686460c72
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -94,26 -93,29 +96,32 @@@ import org.apache.accumulo.core.manager
  import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
  import org.apache.accumulo.core.metadata.AccumuloTable;
  import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 -import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.metrics.MetricsInfo;
 -import org.apache.accumulo.core.process.thrift.ServerProcessService;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
  import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
 +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader;
 +import 
org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader.UnloaderParams;
+ import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
 +import org.apache.accumulo.core.tabletserver.UnloaderParamsImpl;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
+ import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.ComparablePair;
+ import org.apache.accumulo.core.util.Halt;
  import org.apache.accumulo.core.util.MapCounter;
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.core.util.Retry.RetryFactory;
  import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.core.util.threads.ThreadPoolNames;
  import org.apache.accumulo.core.util.threads.Threads;
+ import org.apache.accumulo.core.util.time.SteadyTime;
  import org.apache.accumulo.server.AbstractServer;
  import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
  import org.apache.accumulo.server.TabletLevel;
  import org.apache.accumulo.server.client.ClientServiceHandler;
  import org.apache.accumulo.server.compaction.CompactionWatcher;
@@@ -238,7 -255,7 +243,7 @@@ public class TabletServer extends Abstr
      log.info("Version " + Constants.VERSION);
      log.info("Instance " + getInstanceID());
      this.sessionManager = new SessionManager(context);
--    this.logSorter = new LogSorter(context, aconf);
++    this.logSorter = new LogSorter(this);
      this.statsKeeper = new TabletStatsKeeper();
      final int numBusyTabletsToLog = 
aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT);
      final long logBusyTabletsDelay =
@@@ -375,9 -390,22 +380,9 @@@
  
    void requestStop() {
      log.info("Stop requested.");
-     serverStopRequested = true;
+     gracefulShutdown(getContext().rpcCreds());
    }
  
 -  private class SplitRunner implements Runnable {
 -    private final Tablet tablet;
 -
 -    public SplitRunner(Tablet tablet) {
 -      this.tablet = tablet;
 -    }
 -
 -    @Override
 -    public void run() {
 -      splitTablet(tablet);
 -    }
 -  }
 -
    public long updateTotalQueuedMutationSize(long additionalMutationSize) {
      var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize);
      if (log.isTraceEnabled()) {
@@@ -498,8 -639,8 +503,8 @@@
        UUID tabletServerUUID = UUID.randomUUID();
        tabletServerLock = new ServiceLock(getContext().getZooSession(), 
zLockPath, tabletServerUUID);
  
-       LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> 
serverStopRequested,
 -      LockWatcher lw = new ServiceLockWatcher("tablet server", () -> 
getShutdownComplete().get(),
 -          (name) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
++      LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> 
getShutdownComplete().get(),
 +          (type) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
  
        for (int i = 0; i < 120 / 5; i++) {
          zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
@@@ -583,32 -729,73 +588,36 @@@
        throw new RuntimeException(e);
      }
  
 -    try {
 -      logSorter.startWatchingForRecoveryLogs(this);
 -    } catch (Exception ex) {
 -      log.error("Error setting watches for recoveries");
 -      throw new RuntimeException(ex);
 +    int threadPoolSize =
 +        
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
 +    if (threadPoolSize > 0) {
 +      try {
 +        // Attempt to process all existing log sorting work and start a 
background
 +        // thread to look for log sorting work in the future
-         logSorter.startWatchingForRecoveryLogs(threadPoolSize);
++        logSorter.startWatchingForRecoveryLogs();
 +      } catch (Exception ex) {
 +        log.error("Error starting LogSorter");
 +        throw new RuntimeException(ex);
 +      }
 +    } else {
 +      log.info(
 +          "Log sorting for tablet recovery is disabled, 
TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
      }
 -    final AccumuloConfiguration aconf = getConfiguration();
 -
 -    long tabletCheckFrequency = 
aconf.getTimeInMillis(Property.TSERV_HEALTH_CHECK_FREQ);
 -    // Periodically check that metadata of tablets matches what is held in 
memory
 -    watchCriticalFixedDelay(aconf, tabletCheckFrequency, () -> {
 -      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = 
onlineTablets.snapshot();
 -
 -      Map<KeyExtent,MetadataUpdateCount> updateCounts = new HashMap<>();
  
 -      // gather updateCounts for each tablet before reading tablet metadata
 -      onlineTabletsSnapshot.forEach((ke, tablet) -> {
 -        updateCounts.put(ke, tablet.getUpdateCount());
 -      });
 -
 -      Instant start = Instant.now();
 -      Duration duration;
 -      Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan");
 -      try (Scope scope = mdScanSpan.makeCurrent()) {
 -        List<KeyExtent> missingTablets = new ArrayList<>();
 -        // gather metadata for all tablets readTablets()
 -        try (TabletsMetadata tabletsMetadata = 
getContext().getAmple().readTablets()
 -            .forTablets(onlineTabletsSnapshot.keySet(), 
Optional.of(missingTablets::add))
 -            .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
 -          duration = Duration.between(start, Instant.now());
 -          log.debug("Metadata scan took {}ms for {} tablets read.", 
duration.toMillis(),
 -              onlineTabletsSnapshot.keySet().size());
 -
 -          // for each tablet, compare its metadata to what is held in memory
 -          for (var tabletMetadata : tabletsMetadata) {
 -            KeyExtent extent = tabletMetadata.getExtent();
 -            Tablet tablet = onlineTabletsSnapshot.get(extent);
 -            MetadataUpdateCount counter = updateCounts.get(extent);
 -            tablet.compareTabletInfo(counter, tabletMetadata);
 -          }
 +    final AccumuloConfiguration aconf = getConfiguration();
  
 -          for (var extent : missingTablets) {
 -            Tablet tablet = onlineTabletsSnapshot.get(extent);
 -            if (!tablet.isClosed()) {
 -              log.error("Tablet {} is open but does not exist in metadata 
table.", extent);
 -            }
 -          }
 -        }
 -      } catch (Exception e) {
 -        log.error("Unable to complete verification of tablet metadata", e);
 -        TraceUtil.setException(mdScanSpan, e, true);
 -      } finally {
 -        mdScanSpan.end();
 -      }
 +    final long onDemandUnloaderInterval =
 +        aconf.getTimeInMillis(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL);
 +    watchCriticalFixedDelay(aconf, onDemandUnloaderInterval, () -> {
 +      evaluateOnDemandTabletsForUnload();
      });
  
 -    final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 
TimeUnit.MINUTES.toMillis(15);
 -    
watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
 -        new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
 -        CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS));
 -
      HostAndPort managerHost;
-     while (!serverStopRequested) {
+     while (!isShutdownRequested()) {
+       if (Thread.currentThread().isInterrupted()) {
 -        LOG.info("Server process thread has been interrupted, shutting down");
++        log.info("Server process thread has been interrupted, shutting down");
+         break;
+       }
  
        updateIdleStatus(getOnlineTablets().isEmpty());
  
@@@ -673,20 -860,77 +682,77 @@@
        }
      }
  
-     // wait for shutdown
-     // if the main thread exits oldServer the manager listener, the JVM will
-     // kill the other threads and finalize objects. We want the shutdown that 
is
-     // running in the manager listener thread to complete oldServer this 
happens.
-     // consider making other threads daemon threads so that objects don't
-     // get prematurely finalized
-     synchronized (this) {
-       while (!shutdownComplete) {
+     // Tell the Manager we are shutting down so that it doesn't try
+     // to assign tablets.
+     ManagerClientService.Client iface = 
managerConnection(getManagerAddress());
+     try {
+       iface.tabletServerStopping(TraceUtil.traceInfo(), 
getContext().rpcCreds(),
+           getClientAddressString());
+     } catch (TException e) {
 -      LOG.error("Error informing Manager that we are shutting down, halting 
server", e);
++      log.error("Error informing Manager that we are shutting down, halting 
server", e);
+       Halt.halt("Error informing Manager that we are shutting down, 
exiting!", -1);
+     } finally {
+       returnManagerConnection(iface);
+     }
+ 
+     // Best-effort attempt at unloading tablets.
+     log.debug("Unloading tablets");
+     final List<Future<?>> futures = new ArrayList<>();
+     final ThreadPoolExecutor tpe = getContext().threadPools()
+         
.getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8)
+         .numMaxThreads(16).build();
+ 
+     iface = managerConnection(getManagerAddress());
+     boolean managerDown = false;
+ 
+     try {
+       for (DataLevel level : new DataLevel[] {DataLevel.USER, 
DataLevel.METADATA, DataLevel.ROOT}) {
+         getOnlineTablets().keySet().forEach(ke -> {
+           if (DataLevel.of(ke.tableId()) == level) {
+             futures.add(tpe.submit(new UnloadTabletHandler(this, ke, 
TUnloadTabletGoal.UNASSIGNED,
+                 SteadyTime.from(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS))));
+           }
+         });
+         while (!futures.isEmpty()) {
+           Iterator<Future<?>> unloads = futures.iterator();
+           while (unloads.hasNext()) {
+             Future<?> f = unloads.next();
+             if (f.isDone()) {
+               if (!managerDown) {
+                 ManagerMessage mm = managerMessages.poll();
+                 try {
+                   mm.send(getContext().rpcCreds(), getClientAddressString(), 
iface);
+                 } catch (TException e) {
+                   managerDown = true;
 -                  LOG.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
++                  log.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
+                       e.getMessage());
+                 }
+               }
+               unloads.remove();
+             }
+           }
+           log.debug("Waiting on {} {} tablets to close.", futures.size(), 
level);
+           UtilWaitThread.sleep(1000);
+         }
+         log.debug("All {} tablets unloaded", level);
+       }
+     } finally {
+       if (!managerDown) {
          try {
-           this.wait(1000);
-         } catch (InterruptedException e) {
-           log.error(e.toString());
+           ManagerMessage mm = managerMessages.poll();
+           do {
+             if (mm != null) {
+               mm.send(getContext().rpcCreds(), getClientAddressString(), 
iface);
+             }
+             mm = managerMessages.poll();
+           } while (mm != null);
+         } catch (TException e) {
 -          LOG.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
++          log.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
+               e.getMessage());
          }
        }
+       returnManagerConnection(iface);
+       tpe.shutdown();
      }
  
      log.debug("Stopping Thrift Servers");
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 1fdae2890e,1810e04791..74fa6680be
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -222,15 -222,15 +223,17 @@@ public class LogSorter 
      }
    }
  
++  private final AbstractServer server;
    private final ServerContext context;
    private final AccumuloConfiguration conf;
    private final double walBlockSize;
    private final CryptoService cryptoService;
    private final AccumuloConfiguration sortedLogConf;
  
--  public LogSorter(ServerContext context, AccumuloConfiguration conf) {
--    this.context = context;
--    this.conf = conf;
++  public LogSorter(AbstractServer server) {
++    this.server = server;
++    this.context = this.server.getContext();
++    this.conf = this.context.getConfiguration();
      this.sortedLogConf = extractSortedLogConfig(this.conf);
      this.walBlockSize = DfsLogger.getWalBlockSize(this.conf);
      CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);
@@@ -292,31 -292,14 +295,31 @@@
      }
    }
  
 -  public void startWatchingForRecoveryLogs(AbstractServer server)
 -      throws KeeperException, InterruptedException {
 +  /**
 +   * Sort any logs that need sorting in the current thread.
 +   *
 +   * @return The time in millis when the next check can be done.
 +   */
 +  public long sortLogsIfNeeded() throws KeeperException, InterruptedException 
{
 +    DistributedWorkQueue dwq = new DistributedWorkQueue(
-         context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, 
context);
++        context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, 
server);
 +    dwq.processExistingWork(new LogProcessor(), 
MoreExecutors.newDirectExecutorService(), 1, false);
 +    return System.currentTimeMillis() + dwq.getCheckInterval();
 +  }
 +
 +  /**
 +   * Sort any logs that need sorting in a ThreadPool using
 +   * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will 
start a background
 +   * thread to look for log sorting work in the future that will be processed 
by the
 +   * ThreadPoolExecutor
 +   */
-   public void startWatchingForRecoveryLogs(int threadPoolSize)
-       throws KeeperException, InterruptedException {
++  public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
+     int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
      ThreadPoolExecutor threadPool =
          
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL)
              .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
-         context).processExistingAndFuture(new LogProcessor(), threadPool);
 -        server).startProcessing(new LogProcessor(), threadPool);
++        server).processExistingAndFuture(new LogProcessor(), threadPool);
    }
  
    public List<RecoveryStatus> getLogSorts() {
diff --cc 
server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
index dd9b689229,dd9b689229..3d070c983b
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
@@@ -23,6 -23,6 +23,7 @@@ import static org.apache.accumulo.tserv
  import static org.easymock.EasyMock.createMock;
  import static org.easymock.EasyMock.expect;
  import static org.easymock.EasyMock.replay;
++import static org.easymock.EasyMock.verify;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertNotNull;
  import static org.junit.jupiter.api.Assertions.assertThrows;
@@@ -48,6 -48,6 +49,7 @@@ import org.apache.accumulo.server.Serve
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.fs.VolumeManagerImpl;
  import org.apache.accumulo.server.log.SortedLogState;
++import org.apache.accumulo.tserver.TabletServer;
  import org.apache.accumulo.tserver.WithTestNames;
  import org.apache.accumulo.tserver.logger.LogFileKey;
  import org.apache.accumulo.tserver.logger.LogFileValue;
@@@ -66,6 -66,6 +68,7 @@@ public class RecoveryLogsIteratorTest e
    private VolumeManager fs;
    private File workDir;
    static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, 
null);
++  static TabletServer server;
    static ServerContext context;
    static LogSorter logSorter;
  
@@@ -75,21 -75,21 +78,23 @@@
    @BeforeEach
    public void setUp() throws Exception {
      context = createMock(ServerContext.class);
--
++    server = createMock(TabletServer.class);
      workDir = new File(tempDir, testName());
      String path = workDir.getAbsolutePath();
      fs = VolumeManagerImpl.getLocalForTesting(path);
++    expect(server.getContext()).andReturn(context).anyTimes();
      expect(context.getCryptoFactory()).andReturn(new 
GenericCryptoServiceFactory()).anyTimes();
      expect(context.getVolumeManager()).andReturn(fs).anyTimes();
      
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
--    replay(context);
++    replay(server, context);
  
--    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
++    logSorter = new LogSorter(server);
    }
  
    @AfterEach
    public void tearDown() throws Exception {
      fs.close();
++    verify(server, context);
    }
  
    static class KeyValue implements Comparable<KeyValue> {
diff --cc 
server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 5645dea90f,5645dea90f..bffb83f594
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@@ -70,6 -70,6 +70,7 @@@ import org.apache.accumulo.server.data.
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.fs.VolumeManagerImpl;
  import org.apache.accumulo.server.log.SortedLogState;
++import org.apache.accumulo.tserver.TabletServer;
  import org.apache.accumulo.tserver.WithTestNames;
  import org.apache.accumulo.tserver.logger.LogEvents;
  import org.apache.accumulo.tserver.logger.LogFileKey;
@@@ -96,6 -96,6 +97,7 @@@ public class SortedLogRecoveryTest exte
    static final Text cf = new Text("cf");
    static final Text cq = new Text("cq");
    static final Value value = new Value("value");
++  static TabletServer server;
    static ServerContext context;
    static LogSorter logSorter;
  
@@@ -104,6 -104,6 +106,7 @@@
  
    @BeforeEach
    public void setup() {
++    server = EasyMock.createMock(TabletServer.class);
      context = EasyMock.createMock(ServerContext.class);
    }
  
@@@ -186,12 -186,12 +189,12 @@@
      final String workdir = new File(tempDir, testName()).getAbsolutePath();
      try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) {
        CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory();
--
++      expect(server.getContext()).andReturn(context).anyTimes();
        expect(context.getVolumeManager()).andReturn(fs).anyTimes();
        expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes();
        
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
--      replay(context);
--      logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
++      replay(server, context);
++      logSorter = new LogSorter(server);
  
        final Path workdirPath = new Path("file://" + workdir);
        fs.deleteRecursively(workdirPath);
@@@ -223,7 -223,7 +226,7 @@@
        SortedLogRecovery recovery = new SortedLogRecovery(context, 
fileLenCache, cacheProvider);
        CaptureMutations capture = new CaptureMutations();
        recovery.recover(extent, dirs, files, capture);
--      verify(context);
++      verify(server, context);
        return capture.result;
      }
    }
@@@ -809,7 -809,7 +812,7 @@@
      assertEquals(1, mutations1.size());
      assertEquals(m2, mutations1.get(0));
  
--    reset(context);
++    reset(server, context);
      List<Mutation> mutations2 = recover(logs, e2);
      assertEquals(2, mutations2.size());
      assertEquals(m3, mutations2.get(0));
@@@ -820,7 -820,7 +823,7 @@@
      Arrays.sort(entries2);
      logs.put("entries2", entries2);
  
--    reset(context);
++    reset(server, context);
      mutations2 = recover(logs, e2);
      assertEquals(1, mutations2.size());
      assertEquals(m4, mutations2.get(0));
@@@ -860,7 -860,7 +863,7 @@@
      // test having different paths for the same file. This can happen as a 
result of upgrade or user
      // changing configuration
      runPathTest(false, "/t1/f1", "/t1/f0");
--    reset(context);
++    reset(server, context);
      runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1");
  
      String[] aliases = {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1",
@@@ -871,12 -871,12 +874,12 @@@
  
      for (String alias1 : aliases) {
        for (String alias2 : aliases) {
--        reset(context);
++        reset(server, context);
          runPathTest(true, alias1, alias2);
          for (String other : others) {
--          reset(context);
++          reset(server, context);
            runPathTest(true, alias1, other, alias2);
--          reset(context);
++          reset(server, context);
            runPathTest(true, alias1, alias2, other);
          }
        }
@@@ -884,7 -884,7 +887,7 @@@
  
      for (String alias1 : aliases) {
        for (String other : others) {
--        reset(context);
++        reset(server, context);
          runPathTest(false, alias1, other);
        }
      }
@@@ -1035,34 -1035,34 +1038,34 @@@
  
      logs.put("entries2", entries2);
  
--    reset(context);
++    reset(server, context);
      mutations = recover(logs, extent);
      assertEquals(1, mutations.size());
      assertEquals(m1, mutations.get(0));
  
      logs.put("entries3", entries3);
  
--    reset(context);
++    reset(server, context);
      mutations = recover(logs, extent);
      assertEquals(1, mutations.size());
      assertEquals(m1, mutations.get(0));
  
      logs.put("entries4", entries4);
  
--    reset(context);
++    reset(server, context);
      mutations = recover(logs, extent);
      assertEquals(1, mutations.size());
      assertEquals(m1, mutations.get(0));
  
      logs.put("entries5", entries5);
  
--    reset(context);
++    reset(server, context);
      mutations = recover(logs, extent);
      assertEquals(0, mutations.size());
  
      logs.put("entries6", entries6);
  
--    reset(context);
++    reset(server, context);
      mutations = recover(logs, extent);
      assertEquals(1, mutations.size());
      assertEquals(m2, mutations.get(0));
@@@ -1098,8 -1098,8 +1101,12 @@@
      // test all the possible properties for tserver.sort.file. prefix
      String prop = Property.TSERV_WAL_SORT_FILE_PREFIX + "invalid";
      testConfig.set(prop, "snappy");
--    assertThrows(IllegalArgumentException.class, () -> new LogSorter(context, 
testConfig),
++    expect(server.getContext()).andReturn(context).anyTimes();
++    expect(context.getConfiguration()).andReturn(testConfig).anyTimes();
++    replay(server, context);
++    assertThrows(IllegalArgumentException.class, () -> new LogSorter(server),
          "Did not throw IllegalArgumentException for " + prop);
++    verify(server, context);
    }
  
    @Test
@@@ -1122,11 -1122,11 +1129,12 @@@
  
      try (var vm = VolumeManagerImpl.getLocalForTesting(workdir)) {
        CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory();
++      expect(server.getContext()).andReturn(context).anyTimes();
        expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes();
        expect(context.getVolumeManager()).andReturn(vm).anyTimes();
--      
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
--      replay(context);
--      LogSorter sorter = new LogSorter(context, testConfig);
++      expect(context.getConfiguration()).andReturn(testConfig).anyTimes();
++      replay(server, context);
++      LogSorter sorter = new LogSorter(server);
  
        final Path workdirPath = new Path("file://" + workdir);
        vm.deleteRecursively(workdirPath);
diff --cc 
server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index 8aab03b5c6,8aab03b5c6..cd7d3bf29f
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@@ -33,12 -33,12 +33,12 @@@ import java.io.IOException
  import java.io.InputStream;
  import java.io.OutputStream;
  
--import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.fs.VolumeManagerImpl;
++import org.apache.accumulo.tserver.TabletServer;
  import org.apache.accumulo.tserver.WithTestNames;
  import org.apache.commons.io.IOUtils;
  import org.apache.hadoop.fs.Path;
@@@ -60,8 -60,8 +60,8 @@@ public class TestUpgradePathForWALogs e
    // logs from 2.0 were changed for improved crypto
    private static final String WALOG_FROM_20 = "/walog-from-20.walog";
  
--  private static final AccumuloConfiguration config = 
DefaultConfiguration.getInstance();
    private ServerContext context;
++  private TabletServer server;
  
    @TempDir
    private static File tempDir;
@@@ -71,6 -71,6 +71,7 @@@
    @BeforeEach
    public void setUp() throws Exception {
      context = createMock(ServerContext.class);
++    server = createMock(TabletServer.class);
  
      // Create a new subdirectory for each test
      perTestTempSubDir = new File(tempDir, testName());
@@@ -81,14 -81,14 +82,16 @@@
  
      VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path);
  
++    expect(server.getContext()).andReturn(context).anyTimes();
++    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
      expect(context.getCryptoFactory()).andReturn(new 
GenericCryptoServiceFactory()).anyTimes();
      expect(context.getVolumeManager()).andReturn(fs).anyTimes();
--    replay(context);
++    replay(server, context);
    }
  
    @AfterEach
    public void tearDown() {
--    verify(context);
++    verify(server, context);
    }
  
    /**
@@@ -105,7 -105,7 +108,7 @@@
        walogInHDFStream.flush();
        walogInHDFStream.close();
  
--      LogSorter logSorter = new LogSorter(context, config);
++      LogSorter logSorter = new LogSorter(server);
        LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
  
        assertThrows(IllegalArgumentException.class,
@@@ -128,7 -128,7 +131,7 @@@
  
        
assertFalse(context.getVolumeManager().exists(getFinishedMarkerPath(destPath)));
  
--      LogSorter logSorter = new LogSorter(context, config);
++      LogSorter logSorter = new LogSorter(server);
        LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
  
        logProcessor.sort(context.getVolumeManager(), walogToTest,
@@@ -152,7 -152,7 +155,7 @@@
  
        
assertFalse(context.getVolumeManager().exists(getFinishedMarkerPath(destPath)));
  
--      LogSorter logSorter = new LogSorter(context, config);
++      LogSorter logSorter = new LogSorter(server);
        LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
        logProcessor.sort(context.getVolumeManager(), walogToTest,
            new Path("file://" + testPath + walogToTest), destPath);
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index a032aeb191,4d13023a31..c579ea25c2
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@@ -65,8 -63,7 +65,8 @@@ public class ExternalDoNothingCompacto
        CountDownLatch stopped, AtomicReference<Throwable> err) {
  
      // Set this to true so that only 1 external compaction is run
 +    final AtomicReference<FileCompactor> ref = new AtomicReference<>();
-     this.shutdown = true;
+     gracefulShutdown(getContext().rpcCreds());
  
      return new FileCompactorRunnable() {
  
diff --cc 
test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
index 9c1234efd4,0000000000..4b377e09da
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
@@@ -1,421 -1,0 +1,422 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.time.Duration;
 +import java.util.AbstractMap;
 +import java.util.ArrayList;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletAvailability;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.server.ServerContext;
++import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.AfterAll;
 +import org.junit.jupiter.api.BeforeAll;
 +import org.junit.jupiter.api.BeforeEach;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
 +    implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {
 +
 +  public static class FeoTestEnv extends TestEnv {
 +    private final AccumuloClient client;
 +
 +    public FeoTestEnv(AccumuloClient client) {
 +      this.client = client;
 +    }
 +
 +    AccumuloClient getClient() {
 +      return client;
 +    }
 +  }
 +
 +  public static class FirstOp implements 
Repo<FateExecutionOrderIT.FeoTestEnv> {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String 
step) throws Exception {
 +      try (Scanner scanner = 
env.getClient().createScanner(FATE_TRACKING_TABLE)) {
 +        return scanner.stream()
 +            .anyMatch(e -> 
e.getKey().getColumnFamily().toString().equals(tid.canonical())
 +                && e.getValue().toString().equals(step));
 +      }
 +    }
 +
 +    protected static void insertTrackingData(FateId tid, FeoTestEnv env, 
String step)
 +        throws TableNotFoundException, MutationsRejectedException {
 +      try (BatchWriter bw = 
env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
 +        Mutation mut = new 
Mutation(Long.toString(System.currentTimeMillis()));
 +        mut.put(tid.canonical(), "", step);
 +        bw.addMutation(mut);
 +      }
 +    }
 +
 +    @Override
 +    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
 +      // First call to isReady will return that it's not ready (defer time of 
100ms), inserting
 +      // the data 'isReady1' so we know isReady was called once. The second 
attempt (after the
 +      // deferral time) will pass as ready (return 0) and insert the data 
'isReady2' so we know
 +      // the second call to isReady was made
 +      Thread.sleep(50);
 +      var step = this.getName() + "::isReady";
 +      if (isTrackingDataSet(tid, env, step + "1")) {
 +        insertTrackingData(tid, env, step + "2");
 +        return 0;
 +      } else {
 +        insertTrackingData(tid, env, step + "1");
 +        return 100;
 +      }
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return this.getClass().getSimpleName();
 +    }
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception 
{
 +      Thread.sleep(50);
 +      insertTrackingData(tid, env, this.getName() + "::call");
 +      return new SecondOp();
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return "";
 +    }
 +  }
 +
 +  public static class SecondOp extends FirstOp {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return new LastOp();
 +    }
 +
 +  }
 +
 +  public static class LastOp extends FirstOp {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return null;
 +    }
 +  }
 +
 +  private static final String FATE_TRACKING_TABLE = "fate_tracking";
 +
 +  @BeforeAll
 +  public static void setup() throws Exception {
 +    SharedMiniClusterBase.startMiniCluster();
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      NewTableConfiguration ntc = new NewTableConfiguration();
 +      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
 +      client.tableOperations().create(FATE_TRACKING_TABLE, ntc);
 +    }
 +  }
 +
 +  @AfterAll
 +  public static void teardown() throws Exception {
 +    SharedMiniClusterBase.stopMiniCluster();
 +  }
 +
 +  @BeforeEach
 +  public void before() throws Exception {
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      client.tableOperations().deleteRows(FATE_TRACKING_TABLE, null, null);
 +    }
 +  }
 +
 +  private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws 
Exception {
 +    while (store.read(txid).getStatus() != SUCCESSFUL) {
 +      Thread.sleep(50);
 +    }
 +  }
 +
 +  protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, 
FateStore<FeoTestEnv> store) {
 +    ConfigurationCopy config = new ConfigurationCopy();
 +    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 +    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
 +    return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", 
config);
 +  }
 +
 +  private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
 +    return new AbstractMap.SimpleImmutableEntry<>(
 +        FateId.from(e.getKey().getColumnFamily().toString()), 
e.getValue().toString());
 +  }
 +
 +  @Test
 +  public void testInterleaving() throws Exception {
 +    executeTest(this::testInterleaving);
 +  }
 +
 +  protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext 
sctx)
 +      throws Exception {
 +
 +    // This test verifies that FATE will interleave at least once between 
fate operations when
 +    // their isReady() returns > 0. Interleaving is not guaranteed, so we 
just check for one
 +    // occurrence which is highly unlikely to fail unless something is broken 
with FATE.
 +    // This test also ensures that the expected order of operations occurs 
per fate op.
 +    // Interleaving should have no effect on this.
 +
 +    final int numFateIds = 3;
 +    FateId[] fateIds = new FateId[numFateIds];
 +
 +    for (int i = 0; i < numFateIds; i++) {
 +      fateIds[i] = store.create();
 +      var txStore = store.reserve(fateIds[i]);
 +      try {
 +        txStore.push(new FirstOp());
 +        txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i);
 +        txStore.setStatus(SUBMITTED);
 +      } finally {
 +        txStore.unreserve(Duration.ZERO);
 +      }
 +    }
 +
 +    Fate<FeoTestEnv> fate = null;
 +
 +    // The execution order of the transactions is not according to their 
insertion
 +    // order. However, we do know that the first step of each transaction 
will be
 +    // executed before the second steps.
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      fate = initializeFate(client, store);
 +
 +      for (var fateId : fateIds) {
 +        waitFor(store, fateId);
 +      }
 +
 +      Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
 +      var iter = 
scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();
 +
 +      // we should see the following execution order for all fate ids:
 +      // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
 +      // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
 +      // LastOp::isReady1, LastOp::isReady2, LastOp::call
 +      // the first isReady of each op will defer the op to be executed later, 
allowing for the FATE
 +      // thread to interleave and work on another fate id, but may not always 
interleave.
 +      // It is unlikely that the FATE will not interleave at least once in a 
run, so we will check
 +      // for at least one occurrence.
 +      int interleaves = 0;
 +      int i = 0;
 +      Map.Entry<FateId,String> prevOp = null;
 +      var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", 
"FirstOp::call",
 +          "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", 
"LastOp::isReady1",
 +          "LastOp::isReady2", "LastOp::call");
 +      var fateIdsToExpRunOrder = Map.of(fateIds[0], new 
ArrayList<>(expRunOrder), fateIds[1],
 +          new ArrayList<>(expRunOrder), fateIds[2], new 
ArrayList<>(expRunOrder));
 +
 +      while (iter.hasNext()) {
 +        var currOp = iter.next();
 +        FateId fateId = currOp.getKey();
 +        String currStep = currOp.getValue();
 +        var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);
 +
 +        boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
 +        boolean prevFateIdDiffered = prevOp != null && 
!prevOp.getKey().equals(fateId);
 +        if (passedFirstStep && prevFateIdDiffered) {
 +          interleaves++;
 +        }
 +        assertEquals(currStep, expRunOrderFateId.remove(0));
 +        prevOp = currOp;
 +        i++;
 +      }
 +
 +      assertTrue(interleaves > 0);
 +      assertEquals(i, expRunOrder.size() * numFateIds);
 +      assertEquals(numFateIds, fateIdsToExpRunOrder.size());
 +      for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) {
 +        assertTrue(expRunOrderFateId.isEmpty());
 +      }
 +
 +    } finally {
 +      if (fate != null) {
 +        fate.shutdown(10, TimeUnit.MINUTES);
 +      }
 +    }
 +  }
 +
 +  public static class FirstNonInterleavingOp extends FirstOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
 +      Thread.sleep(50);
 +      insertTrackingData(tid, env, this.getName() + "::isReady");
 +      return 0;
 +    }
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws 
Exception {
 +      Thread.sleep(50);
 +      insertTrackingData(tid, manager, this.getName() + "::call");
 +      return new SecondNonInterleavingOp();
 +    }
 +  }
 +
 +  public static class SecondNonInterleavingOp extends FirstNonInterleavingOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return new LastNonInterleavingOp();
 +    }
 +
 +  }
 +
 +  public static class LastNonInterleavingOp extends FirstNonInterleavingOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return null;
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testNonInterleaving() throws Exception {
 +    executeTest(this::testNonInterleaving);
 +  }
 +
 +  protected void testNonInterleaving(FateStore<FeoTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +
 +    // This test ensures that when isReady() always returns zero that all the 
fate steps will
 +    // execute immediately
 +
 +    final int numFateIds = 3;
 +    FateId[] fateIds = new FateId[numFateIds];
 +
 +    for (int i = 0; i < numFateIds; i++) {
 +      fateIds[i] = store.create();
 +      var txStore = store.reserve(fateIds[i]);
 +      try {
 +        txStore.push(new FirstNonInterleavingOp());
 +        txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i);
 +        txStore.setStatus(SUBMITTED);
 +      } finally {
 +        txStore.unreserve(Duration.ZERO);
 +      }
 +    }
 +
 +    Fate<FeoTestEnv> fate = null;
 +
 +    // The execution order of the transactions is not according to their 
insertion
 +    // order. In this case, without interleaving, a transaction will run 
start to finish
 +    // before moving on to the next transaction
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      fate = initializeFate(client, store);
 +
 +      for (var fateId : fateIds) {
 +        waitFor(store, fateId);
 +      }
 +
 +      Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +      SortedMap<Key,Value> subset = new TreeMap<>();
 +
 +      // should see one fate op execute all of it steps
 +      var seenId1 = verifySameIds(iter, subset);
 +      // should see another fate op execute all of it steps
 +      var seenId2 = verifySameIds(iter, subset);
 +      // should see another fate op execute all of it steps
 +      var seenId3 = verifySameIds(iter, subset);
 +
 +      assertEquals(Set.of(fateIds[0], fateIds[1], fateIds[2]), 
Set.of(seenId1, seenId2, seenId3));
 +
 +      assertFalse(iter.hasNext());
 +
 +    } finally {
 +      if (fate != null) {
 +        fate.shutdown(10, TimeUnit.MINUTES);
 +      }
 +    }
 +  }
 +
 +  private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, 
SortedMap<Key,Value> subset) {
 +    subset.clear();
 +    Iterators.limit(iter, 6).forEachRemaining(e -> subset.put(e.getKey(), 
e.getValue()));
 +
 +    Text fateId = subset.keySet().iterator().next().getColumnFamily();
 +    assertTrue(subset.keySet().stream().allMatch(k -> 
k.getColumnFamily().equals(fateId)));
 +
 +    // list is used to ensure correct operations and correct order of 
operations
 +    var expectedVals = List.of("FirstNonInterleavingOp::isReady", 
"FirstNonInterleavingOp::call",
 +        "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
 +        "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
 +    var actualVals = 
subset.values().stream().map(Value::toString).collect(Collectors.toList());
 +    assertEquals(expectedVals, actualVals);
 +
 +    return FateId.from(fateId.toString());
 +  }
 +
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
index 0000000000,c8e4eec16f..c2d4f9575c
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
@@@ -1,0 -1,273 +1,295 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
++import static org.junit.jupiter.api.Assertions.assertNotNull;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Optional;
+ import java.util.Set;
+ import java.util.stream.IntStream;
+ 
 -import org.apache.accumulo.compactor.Compactor;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
+ import org.apache.accumulo.core.Constants;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
++import org.apache.accumulo.core.client.admin.servers.ServerId;
+ import org.apache.accumulo.core.conf.ClientProperty;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.TableId;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.lock.ServiceLock;
 -import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
+ import org.apache.accumulo.core.lock.ServiceLockData;
+ import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
++import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+ import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+ import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
++import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner;
+ import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+ import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+ import org.apache.accumulo.harness.SharedMiniClusterBase;
+ import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.util.Admin;
+ import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+ import org.apache.accumulo.test.util.Wait;
 -import org.apache.accumulo.tserver.ScanServer;
+ import org.apache.hadoop.conf.Configuration;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.Test;
+ 
+ import com.google.common.net.HostAndPort;
+ 
+ public class GracefulShutdownIT extends SharedMiniClusterBase {
+ 
+   private static final String GROUP_NAME = "graceful";
+ 
+   // @formatter:off
+   private static final String clientConfiguration =
+      "["+
+      " {"+
+      "   \"isDefault\": true,"+
+      "   \"maxBusyTimeout\": \"5m\","+
+      "   \"busyTimeoutMultiplier\": 8,"+
+      "   \"group\":" + GROUP_NAME + "," +
+      "   \"scanTypeActivations\": [graceful],"+
+      "   \"attemptPlans\": ["+
+      "     {"+
+      "       \"servers\": \"3\","+
+      "       \"busyTimeout\": \"33ms\","+
+      "       \"salt\": \"one\""+
+      "     }"+
+      "   ]"+
+      "  }"+
+      "]";
+   // @formatter:on
+ 
+   private static class GracefulShutdownITConfig implements 
MiniClusterConfigurationCallback {
+ 
+     @Override
+     public void configureMiniCluster(MiniAccumuloConfigImpl cfg, 
Configuration coreSite) {
 -      cfg.setNumCompactors(0);
 -      cfg.setNumScanServers(0);
 -      cfg.setNumTservers(2);
 -      
cfg.setProperty(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL,
 "5s");
++      cfg.getClusterServerConfiguration().setNumDefaultCompactors(0);
++      cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
++      cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2);
+       
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, 
"5s");
 -      
cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL,
 "3s");
 -      
cfg.setProperty(Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, 
"true");
+       cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "5s");
+       cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true");
 -      cfg.setProperty("tserver.compaction.major.service." + GROUP_NAME + 
".planner",
 -          DefaultCompactionPlanner.class.getName());
 -      cfg.setProperty("tserver.compaction.major.service." + GROUP_NAME + 
".planner.opts.executors",
 -          "[{'name':'all', 'type': 'external', 'queue': '" + GROUP_NAME + 
"'}]");
++      cfg.setProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + 
GROUP_NAME + ".planner",
++          RatioBasedCompactionPlanner.class.getName());
++      cfg.setProperty(
++          Property.COMPACTION_SERVICE_PREFIX.getKey() + GROUP_NAME + 
".planner.opts.groups",
++          "[{'group': '" + GROUP_NAME + "'}]");
+       
cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() 
+ "profiles",
+           clientConfiguration);
+       // Timeout scan sessions after being idle for 3 seconds
+       cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+     }
+   }
+ 
+   @BeforeAll
+   public static void startup() throws Exception {
+     SharedMiniClusterBase.startMiniClusterWithConfig(new 
GracefulShutdownITConfig());
+   }
+ 
+   @AfterAll
+   public static void shutdown() throws Exception {
+     SharedMiniClusterBase.stopMiniCluster();
+   }
+ 
+   @Test
+   public void testGracefulShutdown() throws Exception {
+ 
+     // Start ScanServers and Compactors using named groups
+     final MiniAccumuloClusterControl control = 
getCluster().getClusterControl();
+ 
+     try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       final ServerContext ctx = getCluster().getServerContext();
+       final String tableName = getUniqueNames(1)[0];
+ 
+       final NewTableConfiguration ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10",
+           "table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
+           "table.compaction.dispatcher.opts.service", GROUP_NAME));
+ 
+       client.tableOperations().create(tableName, ntc);
+       final TableId tid = ctx.getTableId(tableName);
+ 
+       // Insert 10 rows, flush after every row to create 10 files
+       try (BatchWriter writer = client.createBatchWriter(tableName)) {
+         for (int i : IntStream.rangeClosed(1, 10).toArray()) {
+           String val = i + "";
+           Mutation m = new Mutation(val);
+           m.put(val, val, val);
+           writer.addMutation(m);
+           writer.flush();
+           client.tableOperations().flush(tableName, null, null, true);
+         }
+       }
+       long numFiles = getNumFilesForTable(ctx, tid);
+       assertEquals(10, numFiles);
+       client.instanceOperations().waitForBalance();
+ 
+       // Restart Garbage Collector
+       final ServiceLockPath gcLockPath =
 -          ServiceLock.path(ctx.getZooKeeperRoot() + Constants.ZGC_LOCK);
++          
getCluster().getServerContext().getServerPaths().getGarbageCollector(true);
+       Optional<ServiceLockData> data = 
ServiceLock.getLockData(ctx.getZooSession(), gcLockPath);
+       assertTrue(data.isPresent());
+       final HostAndPort gcAddress = 
data.orElseThrow().getAddress(ThriftService.GC);
+       
assertTrue(!control.getProcesses(ServerType.GARBAGE_COLLECTOR).isEmpty());
+       // Don't call `new Admin().execute(new String[] {"signalShutdown", "-h 
", host, "-p ",
+       // Integer.toString(port)})`
+       // because this poisons the SingletonManager and puts it into SERVER 
mode
+       Admin.signalGracefulShutdown(ctx, gcAddress.toString());
+       Wait.waitFor(() -> {
+         control.refreshProcesses(ServerType.GARBAGE_COLLECTOR);
+         return control.getProcesses(ServerType.GARBAGE_COLLECTOR).isEmpty();
+       });
+ 
+       // Restart Tablet Server
 -      final List<String> tservers = 
client.instanceOperations().getTabletServers();
++      final Set<ServiceLockPath> tservers = 
getCluster().getServerContext().getServerPaths()
++          .getTabletServer((rg) -> 
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
++              AddressSelector.all(), true);
+       assertEquals(2, tservers.size());
 -      final HostAndPort tserverAddress = 
HostAndPort.fromString(tservers.get(0));
++      final HostAndPort tserverAddress =
++          HostAndPort.fromString(tservers.iterator().next().getServer());
+       Admin.signalGracefulShutdown(ctx, tserverAddress.toString());
+       Wait.waitFor(() -> {
+         control.refreshProcesses(ServerType.TABLET_SERVER);
+         return control.getProcesses(ServerType.TABLET_SERVER).size() == 1;
+       });
+       client.instanceOperations().waitForBalance();
+       control.start(ServerType.TABLET_SERVER);
+       Wait.waitFor(() -> 
control.getProcesses(ServerType.TABLET_SERVER).size() == 2);
+       client.instanceOperations().waitForBalance();
+ 
+       // Restart Manager
 -      final List<String> managers = 
client.instanceOperations().getManagerLocations();
 -      assertEquals(1, managers.size());
 -      final HostAndPort managerAddress = 
HostAndPort.fromString(managers.get(0));
++      final ServiceLockPath manager =
++          getCluster().getServerContext().getServerPaths().getManager(true);
++      assertNotNull(manager);
++      Set<ServerId> managerLocations =
++          client.instanceOperations().getServers(ServerId.Type.MANAGER);
++      assertNotNull(managerLocations);
++      assertEquals(1, managerLocations.size());
++      final HostAndPort managerAddress =
++          
HostAndPort.fromString(managerLocations.iterator().next().toHostPortString());
+       Admin.signalGracefulShutdown(ctx, managerAddress.toString());
+       Wait.waitFor(() -> {
+         control.refreshProcesses(ServerType.MANAGER);
+         return control.getProcesses(ServerType.MANAGER).isEmpty();
+       });
+       control.start(ServerType.MANAGER);
+       Wait.waitFor(() -> control.getProcesses(ServerType.MANAGER).size() == 
1);
+       client.instanceOperations().waitForBalance();
+ 
+       // Compact table and shutdown compactor
 -      control.startCoordinator(CompactionCoordinator.class);
 -      getCluster().getConfig().setNumCompactors(1);
 -      control.startCompactors(Compactor.class, 1, GROUP_NAME);
 -      Wait.waitFor(() -> client.instanceOperations().getCompactors().size() 
== 1);
 -      final Set<String> compactors = 
client.instanceOperations().getCompactors();
 -      final HostAndPort compactorAddress = 
HostAndPort.fromString(compactors.iterator().next());
++      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP_NAME,
++          1);
++      getCluster().getClusterControl().start(ServerType.COMPACTOR);
++
++      Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
++          .getCompactor((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), 
true).size() == 1);
++      final Set<ServiceLockPath> compactors = 
getCluster().getServerContext().getServerPaths()
++          .getCompactor((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), 
true);
++      final HostAndPort compactorAddress =
++          HostAndPort.fromString(compactors.iterator().next().getServer());
+ 
+       final CompactionConfig cc = new CompactionConfig();
+       final IteratorSetting is = new IteratorSetting(100, SlowIterator.class);
+       SlowIterator.setSeekSleepTime(is, 1000);
+       SlowIterator.setSleepTime(is, 1000);
+       cc.setIterators(List.of(is));
+       cc.setWait(false);
+ 
+       final long numFiles2 = getNumFilesForTable(ctx, tid);
+       assertEquals(numFiles2, numFiles);
 -      assertEquals(0, 
ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize());
++      Set<ServerId> newManagerLocations =
++          client.instanceOperations().getServers(ServerId.Type.MANAGER);
++      assertNotNull(newManagerLocations);
++      assertEquals(1, newManagerLocations.size());
++      final HostAndPort newManagerAddress =
++          
HostAndPort.fromString(newManagerLocations.iterator().next().toHostPortString());
++      assertEquals(0, ExternalCompactionTestUtils
++          .getRunningCompactions(ctx, 
Optional.of(newManagerAddress)).getCompactionsSize());
+       client.tableOperations().compact(tableName, cc);
 -      Wait.waitFor(
 -          () -> 
ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize() > 
0);
++      Wait.waitFor(() -> ExternalCompactionTestUtils
++          .getRunningCompactions(ctx, 
Optional.of(newManagerAddress)).getCompactionsSize() > 0);
+       Admin.signalGracefulShutdown(ctx, compactorAddress.toString());
+       Wait.waitFor(() -> {
+         control.refreshProcesses(ServerType.COMPACTOR);
+         return control.getProcesses(ServerType.COMPACTOR).isEmpty();
+       });
+       final long numFiles3 = getNumFilesForTable(ctx, tid);
+       assertTrue(numFiles3 < numFiles2);
+       assertEquals(1, numFiles3);
+ 
 -      getCluster().getConfig().setNumScanServers(1);
 -      control.startScanServer(ScanServer.class, 1, GROUP_NAME);
 -      Wait.waitFor(() -> client.instanceOperations().getScanServers().size() 
== 1);
 -      final Set<String> sservers = 
client.instanceOperations().getScanServers();
 -      final HostAndPort sserver = 
HostAndPort.fromString(sservers.iterator().next());
++      getCluster().getConfig().getClusterServerConfiguration()
++          .addScanServerResourceGroup(GROUP_NAME, 1);
++      getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
++
++      Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
++          .getScanServer((rg) -> rg.equals(GROUP_NAME), 
AddressSelector.all(), true).size() == 1);
++      final Set<ServiceLockPath> sservers = 
getCluster().getServerContext().getServerPaths()
++          .getScanServer((rg) -> rg.equals(GROUP_NAME), 
AddressSelector.all(), true);
++      final HostAndPort sserver = 
HostAndPort.fromString(sservers.iterator().next().getServer());
+       try (final Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
+         scanner.setRange(new Range());
+         scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+         scanner.setExecutionHints(Map.of("scan_type", "graceful"));
+         scanner.addScanIterator(is); // add the slow iterator
+         scanner.setBatchSize(1);
+         int count = 0;
+         for (Entry<Key,Value> e : scanner) {
+           count++;
+           if (count == 2) {
+             Admin.signalGracefulShutdown(ctx, sserver.toString());
+           }
+         }
+         assertEquals(10, count);
+         Wait.waitFor(() -> {
+           control.refreshProcesses(ServerType.SCAN_SERVER);
+           return control.getProcesses(ServerType.SCAN_SERVER).isEmpty();
+         });
+ 
+       }
+ 
+     }
+ 
+   }
+ 
+   long getNumFilesForTable(ServerContext ctx, TableId tid) {
+     try (TabletsMetadata tablets = 
ctx.getAmple().readTablets().forTable(tid).build()) {
+       return tablets.stream().mapToLong(tm -> tm.getFiles().size()).sum();
+     }
+   }
+ }


Reply via email to