This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit dc3a448af342412edff6aba59460998888f2e782
Merge: d64dda4c66 9c436cb466
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 14 18:14:57 2025 +0000

    Merge branch '2.1'

 .../accumulo/core/clientImpl/ClientContext.java    |  1 -
 .../java/org/apache/accumulo/core/fate/Fate.java   | 38 +++++++---------------
 .../org/apache/accumulo/compactor/Compactor.java   |  4 +--
 .../java/org/apache/accumulo/manager/Manager.java  |  4 +--
 .../org/apache/accumulo/test/fate/FastFate.java    |  4 ++-
 .../accumulo/test/fate/FateExecutionOrderIT.java   |  3 +-
 .../java/org/apache/accumulo/test/fate/FateIT.java |  3 +-
 .../accumulo/test/fate/FateOpsCommandsIT.java      |  3 +-
 .../org/apache/accumulo/test/fate/FlakyFate.java   |  3 +-
 .../accumulo/test/fate/MultipleStoresIT.java       | 12 ++++---
 10 files changed, 33 insertions(+), 42 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 6a5df1550c,c9e714f74c..da9becc64e
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@@ -1090,64 -1132,4 +1090,63 @@@ public class ClientContext implements A
      }
      return this.zkLockChecker;
    }
 +
 +  public ServiceLockPaths getServerPaths() {
 +    return this.serverPaths.get();
 +  }
 +
 +  public NamespaceMapping getNamespaces() {
 +    ensureOpen();
 +    return namespaces;
 +  }
 +
 +  public ClientTabletCache getTabletLocationCache(TableId tableId) {
 +    ensureOpen();
 +    return 
tabletLocationCache.get(DataLevel.of(tableId)).computeIfAbsent(tableId,
 +        (TableId key) -> {
 +          var lockChecker = getTServerLockChecker();
 +          if (AccumuloTable.ROOT.tableId().equals(tableId)) {
 +            return new RootClientTabletCache(lockChecker);
 +          }
 +          var mlo = new MetadataCachedTabletObtainer();
 +          if (AccumuloTable.METADATA.tableId().equals(tableId)) {
 +            return new ClientTabletCacheImpl(AccumuloTable.METADATA.tableId(),
 +                getTabletLocationCache(AccumuloTable.ROOT.tableId()), mlo, 
lockChecker);
 +          } else {
 +            return new ClientTabletCacheImpl(tableId,
 +                getTabletLocationCache(AccumuloTable.METADATA.tableId()), 
mlo, lockChecker);
 +          }
 +        });
 +  }
 +
 +  /**
 +   * Clear the currently cached tablet locations. The use of 
ConcurrentHashMap ensures this is
 +   * thread-safe. However, since the ConcurrentHashMap iterator is weakly 
consistent, it does not
 +   * block new locations from being cached. If new locations are added while 
this is executing, they
 +   * may be immediately invalidated by this code. Multiple calls to this 
method in different threads
 +   * may cause some location caches to be invalidated multiple times. That is 
okay, because cache
 +   * invalidation is idempotent.
 +   */
 +  public void clearTabletLocationCache() {
 +    tabletLocationCache.forEach((dataLevel, map) -> {
 +      // use iter.remove() instead of calling clear() on the map, to prevent 
clearing entries that
 +      // may not have been invalidated
 +      var iter = map.values().iterator();
 +      while (iter.hasNext()) {
 +        iter.next().invalidate();
 +        iter.remove();
 +      }
 +    });
 +  }
 +
 +  private static Set<String> createPersistentWatcherPaths() {
 +    Set<String> pathsToWatch = new HashSet<>();
 +    for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, 
Constants.ZGC_LOCK,
 +        Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, 
Constants.ZMONITOR_LOCK,
 +        Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, 
Constants.ZTABLES,
 +        Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, 
Constants.ZTEST_LOCK)) {
 +      pathsToWatch.add(path);
 +    }
 +    return pathsToWatch;
 +  }
- 
  }
diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 4658718480,aac1921914..560c7da0b2
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -71,100 -58,21 +71,100 @@@ import com.google.gson.JsonParser
  public class Fate<T> {
  
    private static final Logger log = LoggerFactory.getLogger(Fate.class);
 -  private final Logger runnerLog = 
LoggerFactory.getLogger(TransactionRunner.class);
  
 -  private final TStore<T> store;
 -  private final T environment;
 -  private ExecutorService executor;
 +  private final FateStore<T> store;
-   private final ScheduledThreadPoolExecutor fatePoolsWatcher;
++  private final ScheduledFuture<?> fatePoolsWatcherFuture;
 +  private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0);
 +  private final ExecutorService deadResCleanerExecutor;
  
    private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
 +  public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
 +  private static final Duration DEAD_RES_CLEANUP_DELAY = 
Duration.ofMinutes(3);
 +  private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
  
    private final AtomicBoolean keepRunning = new AtomicBoolean(true);
 +  private final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
  
    public enum TxInfo {
 -    TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
 +    FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
 +  }
 +
 +  public enum FateOperation {
 +    COMMIT_COMPACTION(null),
 +    NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE),
 +    NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE),
 +    NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
 +    SHUTDOWN_TSERVER(null),
 +    SYSTEM_SPLIT(null),
 +    SYSTEM_MERGE(null),
 +    TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
 +    TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
 +    TABLE_CLONE(TFateOperation.TABLE_CLONE),
 +    TABLE_COMPACT(TFateOperation.TABLE_COMPACT),
 +    TABLE_CREATE(TFateOperation.TABLE_CREATE),
 +    TABLE_DELETE(TFateOperation.TABLE_DELETE),
 +    TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE),
 +    TABLE_EXPORT(TFateOperation.TABLE_EXPORT),
 +    TABLE_IMPORT(TFateOperation.TABLE_IMPORT),
 +    TABLE_MERGE(TFateOperation.TABLE_MERGE),
 +    TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE),
 +    TABLE_ONLINE(TFateOperation.TABLE_ONLINE),
 +    TABLE_RENAME(TFateOperation.TABLE_RENAME),
 +    TABLE_SPLIT(TFateOperation.TABLE_SPLIT),
 +    TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);
 +
 +    private final TFateOperation top;
 +    private static final Set<FateOperation> nonThriftOps = 
Collections.unmodifiableSet(
 +        EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, 
SYSTEM_MERGE));
 +    private static final Set<FateOperation> allUserFateOps =
 +        Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class));
 +    private static final Set<FateOperation> allMetaFateOps =
 +        Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class));
 +
 +    FateOperation(TFateOperation top) {
 +      this.top = top;
 +    }
 +
 +    public static FateOperation fromThrift(TFateOperation top) {
 +      return FateOperation.valueOf(top.name());
 +    }
 +
 +    public static Set<FateOperation> getNonThriftOps() {
 +      return nonThriftOps;
 +    }
 +
 +    public TFateOperation toThrift() {
 +      if (top == null) {
 +        throw new IllegalStateException(this + " does not have an equivalent 
thrift form");
 +      }
 +      return top;
 +    }
 +
 +    public static Set<FateOperation> getAllUserFateOps() {
 +      return allUserFateOps;
 +    }
 +
 +    public static Set<FateOperation> getAllMetaFateOps() {
 +      return allMetaFateOps;
 +    }
    }
  
 -  private class TransactionRunner implements Runnable {
 +  // The fate pools watcher:
 +  // - Maintains a TransactionRunner per available thread per 
pool/FateExecutor. Does so by
 +  // periodically checking the pools for an inactive thread (i.e., a thread 
running a
 +  // TransactionRunner died or the pool size was increased in the property), 
resizing the pool and
 +  // submitting new runners as needed. Also safely stops the necessary number 
of TransactionRunners
 +  // if the pool size in the property was decreased.
 +  // - Warns the user to consider increasing the pool size (or splitting the 
fate ops assigned to
 +  // that pool into separate pools) for any pool that does not often have any 
idle threads.
 +  private class FatePoolsWatcher implements Runnable {
 +    private final T environment;
 +    private final AccumuloConfiguration conf;
 +
 +    private FatePoolsWatcher(T environment, AccumuloConfiguration conf) {
 +      this.environment = environment;
 +      this.conf = conf;
 +    }
  
      @Override
      public void run() {
@@@ -223,122 -166,77 +223,122 @@@
          }
        }
      }
 +  }
  
 -    private void transitionToFailed(long tid, Exception e) {
 -      String tidStr = FateTxId.formatTid(tid);
 -      final String msg = "Failed to execute Repo " + tidStr;
 -      // Certain FATE ops that throw exceptions don't need to be propagated 
up to the Monitor
 -      // as a warning. They're a normal, handled failure condition.
 -      if (e instanceof AcceptableException) {
 -        var tableOpEx = (AcceptableThriftTableOperationException) e;
 -        log.debug(msg + " for {}({}) {}", tableOpEx.getTableName(), 
tableOpEx.getTableId(),
 -            tableOpEx.getDescription());
 -      } else {
 -        log.warn(msg, e);
 +  /**
 +   * A thread that finds reservations held by dead processes and unreserves 
them
 +   */
 +  private class DeadReservationCleaner implements Runnable {
 +    @Override
 +    public void run() {
 +      if (keepRunning.get()) {
 +        store.deleteDeadReservations();
        }
 -      store.setTransactionInfo(tid, TxInfo.EXCEPTION, e);
 -      store.setStatus(tid, FAILED_IN_PROGRESS);
 -      log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", 
tidStr);
      }
 +  }
  
 -    private void processFailed(long tid, Repo<T> op) {
 -      while (op != null) {
 -        undo(tid, op);
 +  /**
 +   * Creates a Fault-tolerant executor for the given store type.
 +   *
 +   * @param runDeadResCleaner Whether this FATE should run a dead reservation 
cleaner. The real
 +   *        FATEs need have a cleaner, but may be undesirable in testing.
 +   * @param toLogStrFunc A function that converts Repo to Strings that are 
suitable for logging
 +   */
 +  public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
-       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
++      Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf,
++      ScheduledThreadPoolExecutor genSchedExecutor) {
 +    this.store = FateLogger.wrap(store, toLogStrFunc, false);
 +
-     this.fatePoolsWatcher =
-         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-     ThreadPools.watchCriticalScheduledTask(
-         fatePoolsWatcher.scheduleWithFixedDelay(new 
FatePoolsWatcher(environment, conf),
-             INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), 
SECONDS));
++    fatePoolsWatcherFuture =
++        genSchedExecutor.scheduleWithFixedDelay(new 
FatePoolsWatcher(environment, conf),
++            INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), 
SECONDS);
++    ThreadPools.watchCriticalScheduledTask(fatePoolsWatcherFuture);
 +
 +    ScheduledExecutorService deadResCleanerExecutor = null;
 +    if (runDeadResCleaner) {
 +      // Create a dead reservation cleaner for this store that will 
periodically clean up
 +      // reservations held by dead processes, if they exist.
 +      deadResCleanerExecutor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
 +          store.type() == FateInstanceType.USER ? 
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
 +              : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
 +      ScheduledFuture<?> deadReservationCleaner =
 +          deadResCleanerExecutor.scheduleWithFixedDelay(new 
DeadReservationCleaner(),
 +              INITIAL_DELAY.toSeconds(), 
getDeadResCleanupDelay().toSeconds(), SECONDS);
 +      ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
 +    }
 +    this.deadResCleanerExecutor = deadResCleanerExecutor;
  
 -        store.pop(tid);
 -        op = store.top(tid);
 -      }
 +    startFateExecutors(environment, conf, fateExecutors);
 +  }
  
 -      store.setStatus(tid, FAILED);
 -      doCleanUp(tid);
 +  protected void startFateExecutors(T environment, AccumuloConfiguration conf,
 +      Set<FateExecutor<T>> fateExecutors) {
 +    for (var poolConf : getPoolConfigurations(conf).entrySet()) {
 +      // no fate threads are running at this point; fine not to synchronize
 +      fateExecutors
 +          .add(new FateExecutor<>(this, environment, poolConf.getKey(), 
poolConf.getValue()));
      }
 +  }
  
 -    private void doCleanUp(long tid) {
 -      Boolean autoClean = (Boolean) store.getTransactionInfo(tid, 
TxInfo.AUTO_CLEAN);
 -      if (autoClean != null && autoClean) {
 -        store.delete(tid);
 -      } else {
 -        // no longer need persisted operations, so delete them to save space 
in case
 -        // TX is never cleaned up...
 -        while (store.top(tid) != null) {
 -          store.pop(tid);
 -        }
 -      }
 +  /**
 +   * Returns a map of the current pool configurations as set in the given 
config. Each key is a set
 +   * of fate operations and each value is an integer for the number of 
threads assigned to work
 +   * those fate operations.
 +   */
 +  protected Map<Set<FateOperation>,Integer> 
getPoolConfigurations(AccumuloConfiguration conf) {
 +    Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
 +    final var json = 
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
 +
 +    for (var entry : json.entrySet()) {
 +      var key = entry.getKey();
 +      var val = entry.getValue().getAsInt();
 +      var fateOpsStrArr = key.split(",");
 +      Set<FateOperation> fateOpsSet = 
Arrays.stream(fateOpsStrArr).map(FateOperation::valueOf)
 +          .collect(Collectors.toCollection(TreeSet::new));
 +
 +      poolConfigs.put(fateOpsSet, val);
      }
  
 -    private void undo(long tid, Repo<T> op) {
 -      try {
 -        op.undo(tid, environment);
 -      } catch (Exception e) {
 -        log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e);
 -      }
 -    }
 +    return poolConfigs;
    }
  
 -  protected long executeIsReady(Long tid, Repo<T> op) throws Exception {
 -    var startTime = Timer.startNew();
 -    var deferTime = op.isReady(tid, environment);
 -    if (log.isTraceEnabled()) {
 -      log.trace("Running {}.isReady() {} took {} ms and returned {}", 
op.getName(),
 -          FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS), 
deferTime);
 -    }
 -    return deferTime;
 +  protected AtomicBoolean getKeepRunning() {
 +    return keepRunning;
 +  }
 +
 +  protected FateStore<T> getStore() {
 +    return store;
 +  }
 +
 +  protected Property getFateConfigProp() {
 +    return this.store.type() == FateInstanceType.USER ? 
Property.MANAGER_FATE_USER_CONFIG
 +        : Property.MANAGER_FATE_META_CONFIG;
 +  }
 +
 +  public Duration getDeadResCleanupDelay() {
 +    return DEAD_RES_CLEANUP_DELAY;
 +  }
 +
 +  public Duration getPoolWatcherDelay() {
 +    return POOL_WATCHER_DELAY;
    }
  
 -  protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception {
 -    var startTime = Timer.startNew();
 -    var next = op.call(tid, environment);
 -    if (log.isTraceEnabled()) {
 -      log.trace("Running {}.call() {} took {} ms and returned {}", 
op.getName(),
 -          FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS),
 -          next == null ? "null" : next.getName());
 +  /**
 +   * Returns the number of TransactionRunners active for the FateExecutor 
assigned to work on the
 +   * given set of fate operations. "Active" meaning it is waiting for a 
transaction to work on or
 +   * actively working on one. Returns 0 if no such FateExecutor exists. This 
should only be used for
 +   * testing
 +   */
 +  @VisibleForTesting
 +  public int getTxRunnersActive(Set<FateOperation> fateOps) {
 +    synchronized (fateExecutors) {
 +      for (var fateExecutor : fateExecutors) {
 +        if (fateExecutor.getFateOps().equals(fateOps)) {
 +          return fateExecutor.getRunningTxRunners().size();
 +        }
 +      }
      }
 -    return next;
 +    return 0;
    }
  
    /**
@@@ -480,102 -417,11 +480,86 @@@
    }
  
    /**
 -   * Flags that FATE threadpool to clear out and end. Does not actively stop 
running FATE processes.
 +   * Lists transctions for a given fate key type.
 +   */
 +  public Stream<FateKey> list(FateKey.FateKeyType type) {
 +    return store.list(type);
 +  }
 +
 +  /**
 +   * Initiates shutdown of background threads and optionally waits on them.
     */
 -  public void shutdown() {
 -    keepRunning.set(false);
 -    executor.shutdown();
 +  public void shutdown(long timeout, TimeUnit timeUnit) {
 +    log.info("Shutting down {} FATE", store.type());
 +
 +    if (keepRunning.compareAndSet(true, false)) {
 +      synchronized (fateExecutors) {
 +        for (var fateExecutor : fateExecutors) {
 +          fateExecutor.initiateShutdown();
 +        }
 +      }
 +      if (deadResCleanerExecutor != null) {
 +        deadResCleanerExecutor.shutdown();
 +      }
-       fatePoolsWatcher.shutdown();
++      fatePoolsWatcherFuture.cancel(false);
 +    }
 +
 +    if (timeout > 0) {
 +      long start = System.nanoTime();
 +      try {
 +        waitForAllFateExecShutdown(start, timeout, timeUnit);
 +        waitForDeadResCleanerShutdown(start, timeout, timeUnit);
-         waitForFatePoolsWatcherShutdown(start, timeout, timeUnit);
 +
-         if (anyFateExecutorIsAlive() || deadResCleanerIsAlive() || 
fatePoolsWatcherIsAlive()) {
++        if (anyFateExecutorIsAlive() || deadResCleanerIsAlive()) {
 +          log.warn(
 +              "Waited for {}ms for all fate {} background threads to stop, 
but some are still running. "
-                   + "fate executor threads:{} dead reservation cleaner 
thread:{} "
-                   + "fate pools watcher thread:{}",
++                  + "fate executor threads:{} dead reservation cleaner 
thread:{} ",
 +              TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), 
store.type(),
-               anyFateExecutorIsAlive(), deadResCleanerIsAlive(), 
fatePoolsWatcherIsAlive());
++              anyFateExecutorIsAlive(), deadResCleanerIsAlive());
 +        }
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    // interrupt the background threads
 +    synchronized (fateExecutors) {
 +      for (var fateExecutor : fateExecutors) {
 +        fateExecutor.shutdownNow();
 +        fateExecutor.getIdleCountHistory().clear();
 +      }
 +    }
 +    if (deadResCleanerExecutor != null) {
 +      deadResCleanerExecutor.shutdownNow();
 +    }
-     fatePoolsWatcher.shutdownNow();
 +  }
 +
 +  private boolean anyFateExecutorIsAlive() {
 +    synchronized (fateExecutors) {
 +      return fateExecutors.stream().anyMatch(FateExecutor::isAlive);
 +    }
 +  }
 +
 +  private boolean deadResCleanerIsAlive() {
 +    return deadResCleanerExecutor != null && 
!deadResCleanerExecutor.isTerminated();
    }
  
-   private boolean fatePoolsWatcherIsAlive() {
-     return !fatePoolsWatcher.isTerminated();
-   }
- 
 +  private void waitForAllFateExecShutdown(long start, long timeout, TimeUnit 
timeUnit)
 +      throws InterruptedException {
 +    synchronized (fateExecutors) {
 +      for (var fateExecutor : fateExecutors) {
 +        fateExecutor.waitForShutdown(start, timeout, timeUnit);
 +      }
 +    }
 +  }
 +
 +  private void waitForDeadResCleanerShutdown(long start, long timeout, 
TimeUnit timeUnit)
 +      throws InterruptedException {
 +    while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && 
deadResCleanerIsAlive()) {
 +      if (deadResCleanerExecutor != null && 
!deadResCleanerExecutor.awaitTermination(1, SECONDS)) {
 +        log.debug("Fate {} is waiting for dead reservation cleaner thread to 
terminate",
 +            store.type());
 +      }
 +    }
 +  }
- 
-   private void waitForFatePoolsWatcherShutdown(long start, long timeout, 
TimeUnit timeUnit)
-       throws InterruptedException {
-     while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && 
fatePoolsWatcherIsAlive()) {
-       if (!fatePoolsWatcher.awaitTermination(1, SECONDS)) {
-         log.debug("Fate {} is waiting for fate pools watcher thread to 
terminate", store.type());
-       }
-     }
-   }
  }
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2bfc6b7276,9d95a73429..953b70f53b
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -688,8 -686,8 +688,8 @@@ public class Compactor extends Abstract
      metricsInfo.init(getServiceTags(clientAddress));
  
      var watcher = new CompactionWatcher(getConfiguration());
-     var schedExecutor = ThreadPools.getServerThreadPools()
-         .createGeneralScheduledExecutorService(getConfiguration());
+     var schedExecutor = getContext().getScheduledExecutor();
 -    startGCLogger(schedExecutor);
++
      startCancelChecker(schedExecutor,
          
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
  
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 4c6d75755b,8cb51d160b..78e5c017c9
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1487,16 -1494,18 +1487,16 @@@ public class Manager extends AbstractSe
      }
    }
  
 -  @Deprecated
 -  private void initializeZkForReplication(ZooReaderWriter zReaderWriter, 
String zroot) {
 -    try {
 -      org.apache.accumulo.server.replication.ZooKeeperInitialization
 -          .ensureZooKeeperInitialized(zReaderWriter, zroot);
 -    } catch (KeeperException | InterruptedException e) {
 -      throw new IllegalStateException("Exception while ensuring ZooKeeper is 
initialized", e);
 -    }
 -  }
 +  protected Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
 +
-     final Fate<Manager> fateInstance =
-         new Fate<>(this, store, true, TraceRepo::toLogString, 
getConfiguration());
++    final Fate<Manager> fateInstance = new Fate<>(this, store, true, 
TraceRepo::toLogString,
++        getConfiguration(), context.getScheduledExecutor());
  
 -  protected Fate<Manager> initializeFateInstance(TStore<Manager> store) {
 -    return new Fate<>(this, store, TraceRepo::toLogString);
 +    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
this::getSteadyTime);
 +    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 +        .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
 +
 +    return fateInstance;
    }
  
    /**
diff --cc test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
index e33906f29c,0000000000..f23596569f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
@@@ -1,52 -1,0 +1,54 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import java.time.Duration;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.function.Function;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.Repo;
 +
 +/**
 + * A FATE which performs the dead reservation cleanup and the check on the 
pool size with a much
 + * shorter delay between. Useful for shortening test times for tests that are 
waiting for one of
 + * these actions to occur.
 + */
 +public class FastFate<T> extends Fate<T> {
 +  private static final Duration DEAD_RES_CLEANUP_DELAY = 
Duration.ofSeconds(5);
 +  private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5);
 +
 +  public FastFate(T environment, FateStore<T> store, boolean 
runDeadResCleaner,
 +      Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
-     super(environment, store, runDeadResCleaner, toLogStrFunc, conf);
++    super(environment, store, runDeadResCleaner, toLogStrFunc, conf,
++        new ScheduledThreadPoolExecutor(2));
 +  }
 +
 +  @Override
 +  public Duration getDeadResCleanupDelay() {
 +    return DEAD_RES_CLEANUP_DELAY;
 +  }
 +
 +  @Override
 +  public Duration getPoolWatcherDelay() {
 +    return POOL_WATCHER_DELAY;
 +  }
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
index ba01be9990,0000000000..da3b76a242
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
@@@ -1,418 -1,0 +1,419 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
 +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.time.Duration;
 +import java.util.AbstractMap;
 +import java.util.ArrayList;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletAvailability;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.AfterAll;
 +import org.junit.jupiter.api.BeforeAll;
 +import org.junit.jupiter.api.BeforeEach;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
 +    implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {
 +
 +  public static class FeoTestEnv extends FateTestRunner.TestEnv {
 +    private final AccumuloClient client;
 +
 +    public FeoTestEnv(AccumuloClient client) {
 +      this.client = client;
 +    }
 +
 +    AccumuloClient getClient() {
 +      return client;
 +    }
 +  }
 +
 +  public static class FirstOp implements 
Repo<FateExecutionOrderIT.FeoTestEnv> {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String 
step) throws Exception {
 +      try (Scanner scanner = 
env.getClient().createScanner(FATE_TRACKING_TABLE)) {
 +        return scanner.stream()
 +            .anyMatch(e -> 
e.getKey().getColumnFamily().toString().equals(tid.canonical())
 +                && e.getValue().toString().equals(step));
 +      }
 +    }
 +
 +    protected static void insertTrackingData(FateId tid, FeoTestEnv env, 
String step)
 +        throws TableNotFoundException, MutationsRejectedException {
 +      try (BatchWriter bw = 
env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
 +        Mutation mut = new 
Mutation(Long.toString(System.currentTimeMillis()));
 +        mut.put(tid.canonical(), "", step);
 +        bw.addMutation(mut);
 +      }
 +    }
 +
 +    @Override
 +    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
 +      // First call to isReady will return that it's not ready (defer time of 
100ms), inserting
 +      // the data 'isReady1' so we know isReady was called once. The second 
attempt (after the
 +      // deferral time) will pass as ready (return 0) and insert the data 
'isReady2' so we know
 +      // the second call to isReady was made
 +      Thread.sleep(50);
 +      var step = this.getName() + "::isReady";
 +      if (isTrackingDataSet(tid, env, step + "1")) {
 +        insertTrackingData(tid, env, step + "2");
 +        return 0;
 +      } else {
 +        insertTrackingData(tid, env, step + "1");
 +        return 100;
 +      }
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return this.getClass().getSimpleName();
 +    }
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception 
{
 +      Thread.sleep(50);
 +      insertTrackingData(tid, env, this.getName() + "::call");
 +      return new SecondOp();
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return "";
 +    }
 +  }
 +
 +  public static class SecondOp extends FirstOp {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return new LastOp();
 +    }
 +
 +  }
 +
 +  public static class LastOp extends FirstOp {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return null;
 +    }
 +  }
 +
 +  private static final String FATE_TRACKING_TABLE = "fate_tracking";
 +
 +  @BeforeAll
 +  public static void setup() throws Exception {
 +    SharedMiniClusterBase.startMiniCluster();
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      NewTableConfiguration ntc = new NewTableConfiguration();
 +      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
 +      client.tableOperations().create(FATE_TRACKING_TABLE, ntc);
 +    }
 +  }
 +
 +  @AfterAll
 +  public static void teardown() throws Exception {
 +    SharedMiniClusterBase.stopMiniCluster();
 +  }
 +
 +  @BeforeEach
 +  public void before() throws Exception {
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      client.tableOperations().deleteRows(FATE_TRACKING_TABLE, null, null);
 +    }
 +  }
 +
 +  private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws 
Exception {
 +    while (store.read(txid).getStatus() != SUCCESSFUL) {
 +      Thread.sleep(50);
 +    }
 +  }
 +
 +  protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, 
FateStore<FeoTestEnv> store) {
 +    return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "",
-         FateTestUtil.createTestFateConfig(1));
++        FateTestUtil.createTestFateConfig(1), new 
ScheduledThreadPoolExecutor(2));
 +  }
 +
 +  private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
 +    return new AbstractMap.SimpleImmutableEntry<>(
 +        FateId.from(e.getKey().getColumnFamily().toString()), 
e.getValue().toString());
 +  }
 +
 +  @Test
 +  public void testInterleaving() throws Exception {
 +    executeTest(this::testInterleaving);
 +  }
 +
 +  protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext 
sctx)
 +      throws Exception {
 +
 +    // This test verifies that FATE will interleave at least once between 
fate operations when
 +    // their isReady() returns > 0. Interleaving is not guaranteed, so we 
just check for one
 +    // occurrence which is highly unlikely to fail unless something is broken 
with FATE.
 +    // This test also ensures that the expected order of operations occurs 
per fate op.
 +    // Interleaving should have no effect on this.
 +
 +    final int numFateIds = 3;
 +    FateId[] fateIds = new FateId[numFateIds];
 +
 +    for (int i = 0; i < numFateIds; i++) {
 +      fateIds[i] = store.create();
 +      var txStore = store.reserve(fateIds[i]);
 +      try {
 +        txStore.push(new FirstOp());
 +        txStore.setTransactionInfo(TxInfo.FATE_OP, TEST_FATE_OP);
 +        txStore.setStatus(SUBMITTED);
 +      } finally {
 +        txStore.unreserve(Duration.ZERO);
 +      }
 +    }
 +
 +    Fate<FeoTestEnv> fate = null;
 +
 +    // The execution order of the transactions is not according to their 
insertion
 +    // order. However, we do know that the first step of each transaction 
will be
 +    // executed before the second steps.
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      fate = initializeFate(client, store);
 +
 +      for (var fateId : fateIds) {
 +        waitFor(store, fateId);
 +      }
 +
 +      Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
 +      var iter = 
scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();
 +
 +      // we should see the following execution order for all fate ids:
 +      // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
 +      // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
 +      // LastOp::isReady1, LastOp::isReady2, LastOp::call
 +      // the first isReady of each op will defer the op to be executed later, 
allowing for the FATE
 +      // thread to interleave and work on another fate id, but may not always 
interleave.
 +      // It is unlikely that the FATE will not interleave at least once in a 
run, so we will check
 +      // for at least one occurrence.
 +      int interleaves = 0;
 +      int i = 0;
 +      Map.Entry<FateId,String> prevOp = null;
 +      var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", 
"FirstOp::call",
 +          "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", 
"LastOp::isReady1",
 +          "LastOp::isReady2", "LastOp::call");
 +      var fateIdsToExpRunOrder = Map.of(fateIds[0], new 
ArrayList<>(expRunOrder), fateIds[1],
 +          new ArrayList<>(expRunOrder), fateIds[2], new 
ArrayList<>(expRunOrder));
 +
 +      while (iter.hasNext()) {
 +        var currOp = iter.next();
 +        FateId fateId = currOp.getKey();
 +        String currStep = currOp.getValue();
 +        var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);
 +
 +        boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
 +        boolean prevFateIdDiffered = prevOp != null && 
!prevOp.getKey().equals(fateId);
 +        if (passedFirstStep && prevFateIdDiffered) {
 +          interleaves++;
 +        }
 +        assertEquals(currStep, expRunOrderFateId.remove(0));
 +        prevOp = currOp;
 +        i++;
 +      }
 +
 +      assertTrue(interleaves > 0);
 +      assertEquals(i, expRunOrder.size() * numFateIds);
 +      assertEquals(numFateIds, fateIdsToExpRunOrder.size());
 +      for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) {
 +        assertTrue(expRunOrderFateId.isEmpty());
 +      }
 +
 +    } finally {
 +      if (fate != null) {
 +        fate.shutdown(10, TimeUnit.MINUTES);
 +      }
 +    }
 +  }
 +
 +  public static class FirstNonInterleavingOp extends FirstOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
 +      Thread.sleep(50);
 +      insertTrackingData(tid, env, this.getName() + "::isReady");
 +      return 0;
 +    }
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws 
Exception {
 +      Thread.sleep(50);
 +      insertTrackingData(tid, manager, this.getName() + "::call");
 +      return new SecondNonInterleavingOp();
 +    }
 +  }
 +
 +  public static class SecondNonInterleavingOp extends FirstNonInterleavingOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return new LastNonInterleavingOp();
 +    }
 +
 +  }
 +
 +  public static class LastNonInterleavingOp extends FirstNonInterleavingOp {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
 +      super.call(tid, environment);
 +      return null;
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testNonInterleaving() throws Exception {
 +    executeTest(this::testNonInterleaving);
 +  }
 +
 +  protected void testNonInterleaving(FateStore<FeoTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +
 +    // This test ensures that when isReady() always returns zero that all the 
fate steps will
 +    // execute immediately
 +
 +    final int numFateIds = 3;
 +    FateId[] fateIds = new FateId[numFateIds];
 +
 +    for (int i = 0; i < numFateIds; i++) {
 +      fateIds[i] = store.create();
 +      var txStore = store.reserve(fateIds[i]);
 +      try {
 +        txStore.push(new FirstNonInterleavingOp());
 +        txStore.setTransactionInfo(TxInfo.FATE_OP, TEST_FATE_OP);
 +        txStore.setStatus(SUBMITTED);
 +      } finally {
 +        txStore.unreserve(Duration.ZERO);
 +      }
 +    }
 +
 +    Fate<FeoTestEnv> fate = null;
 +
 +    // The execution order of the transactions is not according to their 
insertion
 +    // order. In this case, without interleaving, a transaction will run 
start to finish
 +    // before moving on to the next transaction
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      fate = initializeFate(client, store);
 +
 +      for (var fateId : fateIds) {
 +        waitFor(store, fateId);
 +      }
 +
 +      Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +      SortedMap<Key,Value> subset = new TreeMap<>();
 +
 +      // should see one fate op execute all of it steps
 +      var seenId1 = verifySameIds(iter, subset);
 +      // should see another fate op execute all of it steps
 +      var seenId2 = verifySameIds(iter, subset);
 +      // should see another fate op execute all of it steps
 +      var seenId3 = verifySameIds(iter, subset);
 +
 +      assertEquals(Set.of(fateIds[0], fateIds[1], fateIds[2]), 
Set.of(seenId1, seenId2, seenId3));
 +
 +      assertFalse(iter.hasNext());
 +
 +    } finally {
 +      if (fate != null) {
 +        fate.shutdown(10, TimeUnit.MINUTES);
 +      }
 +    }
 +  }
 +
 +  private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, 
SortedMap<Key,Value> subset) {
 +    subset.clear();
 +    Iterators.limit(iter, 6).forEachRemaining(e -> subset.put(e.getKey(), 
e.getValue()));
 +
 +    Text fateId = subset.keySet().iterator().next().getColumnFamily();
 +    assertTrue(subset.keySet().stream().allMatch(k -> 
k.getColumnFamily().equals(fateId)));
 +
 +    // list is used to ensure correct operations and correct order of 
operations
 +    var expectedVals = List.of("FirstNonInterleavingOp::isReady", 
"FirstNonInterleavingOp::call",
 +        "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
 +        "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
 +    var actualVals = 
subset.values().stream().map(Value::toString).collect(Collectors.toList());
 +    assertEquals(expectedVals, actualVals);
 +
 +    return FateId.from(fateId.toString());
 +  }
 +
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index bb70e6dffd,0000000000..fab396d8ca
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@@ -1,567 -1,0 +1,568 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
 +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP;
 +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.junit.jupiter.api.Assertions.fail;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.fate.AbstractFateStore;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
 +import org.apache.accumulo.test.util.Wait;
 +import org.junit.jupiter.api.Test;
 +import org.junit.jupiter.api.Timeout;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public abstract class FateIT extends SharedMiniClusterBase implements 
FateTestRunner<TestEnv> {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
 +
 +  private static CountDownLatch callStarted;
 +  private static CountDownLatch finishCall;
 +  private static CountDownLatch undoLatch;
 +
 +  private enum ExceptionLocation {
 +    CALL, IS_READY
 +  }
 +
 +  public static class TestRepo implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    private final String data;
 +
 +    public TestRepo() {
 +      this("test");
 +    }
 +
 +    public TestRepo(String data) {
 +      this.data = data;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) throws Exception {
 +      return 0;
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return "TestRepo_" + data;
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      LOG.debug("Entering call {}", fateId);
 +      try {
 +        FateIT.inCall();
 +        return null;
 +      } finally {
 +        LOG.debug("Leaving call {}", fateId);
 +      }
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) throws Exception {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return data + "_ret";
 +    }
 +  }
 +
 +  public static class TestOperationFails implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +    private static final Logger LOG = 
LoggerFactory.getLogger(TestOperationFails.class);
 +    private static List<String> undoOrder = new ArrayList<>();
 +    private static final int TOTAL_NUM_OPS = 3;
 +    private int opNum;
 +    private final String opName;
 +    private final ExceptionLocation location;
 +
 +    public TestOperationFails(int opNum, ExceptionLocation location) {
 +      this.opNum = opNum;
 +      this.opName = "OP" + opNum;
 +      this.location = location;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) throws Exception {
 +      LOG.debug("{} {} Entered isReady()", opName, fateId);
 +      if (location == ExceptionLocation.IS_READY) {
 +        if (opNum < TOTAL_NUM_OPS) {
 +          return 0;
 +        } else {
 +          throw new Exception(opName + " " + fateId + " isReady() failed - 
this is expected");
 +        }
 +      } else {
 +        return 0;
 +      }
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return getClass().getName();
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) throws Exception {
 +      LOG.debug("{} {} Entered undo()", opName, fateId);
 +      undoOrder.add(opName);
 +      undoLatch.countDown();
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      LOG.debug("{} {} Entered call()", opName, fateId);
 +      if (location == ExceptionLocation.CALL) {
 +        if (opNum < TOTAL_NUM_OPS) {
 +          return new TestOperationFails(++opNum, location);
 +        } else {
 +          throw new Exception(opName + " " + fateId + " call() failed - this 
is expected");
 +        }
 +      } else {
 +        return new TestOperationFails(++opNum, location);
 +      }
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return "none";
 +    }
 +  }
 +
 +  /**
 +   * Test Repo that allows configuring a delay time to be returned in 
isReady().
 +   */
 +  public static class DeferredTestRepo implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    private final String data;
 +
 +    // These are static as we don't want to serialize them and they should
 +    // be shared across all instances during the test
 +    private static final AtomicInteger executedCalls = new AtomicInteger();
 +    private static final AtomicLong delay = new AtomicLong();
 +    private static final CountDownLatch callLatch = new CountDownLatch(1);
 +
 +    public DeferredTestRepo(String data) {
 +      this.data = data;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) {
 +      LOG.debug("{} delayed {}", fateId, delay.get());
 +      return delay.get();
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return "TestRepo_" + data;
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      callLatch.await();
 +      LOG.debug("Executing call {}, total executed {}", fateId, 
executedCalls.incrementAndGet());
 +      return null;
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return data + "_ret";
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testTransactionStatus() throws Exception {
 +    executeTest(this::testTransactionStatus);
 +  }
 +
 +  protected void testTransactionStatus(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testTransactionStatus"), true,
 +          "Test Op");
 +      assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
 +      // wait for call() to be called
 +      callStarted.await();
 +      assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId));
 +      // tell the op to exit the method
 +      finishCall.countDown();
 +      // Check that it transitions to SUCCESSFUL and then removed (UNKNOWN)
 +      final var sawSuccess = new AtomicBoolean(false);
 +      Wait.waitFor(() -> {
 +        TStatus s;
 +        switch (s = getTxStatus(sctx, fateId)) {
 +          case IN_PROGRESS:
 +            if (sawSuccess.get()) {
 +              fail("Should never see IN_PROGRESS after seeing SUCCESSFUL");
 +            }
 +            break;
 +          case SUCCESSFUL:
 +            // expected, but might be too quick to be detected
 +            if (sawSuccess.compareAndSet(false, true)) {
 +              LOG.debug("Saw expected transaction status change to 
SUCCESSFUL");
 +            }
 +            break;
 +          case UNKNOWN:
 +            if (!sawSuccess.get()) {
 +              LOG.debug("Never saw transaction status change to SUCCESSFUL, 
but that's okay");
 +            }
 +            return true;
 +          default:
 +            fail("Saw unexpected status: " + s);
 +        }
 +        // keep waiting for UNKNOWN
 +        return false;
 +      }, SECONDS.toMillis(30), 10);
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileNew() throws Exception {
 +    executeTest(this::testCancelWhileNew);
 +  }
 +
 +  protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext 
sctx) throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileNew with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      // cancel the transaction
 +      assertTrue(fate.cancel(fateId));
 +      assertTrue(
 +          FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == 
getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileNew"), true,
 +          "Test Op");
 +      Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId));
 +      // nothing should have run
 +      assertEquals(1, callStarted.getCount());
 +      fate.delete(fateId);
 +      assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileSubmittedAndRunning() throws Exception {
 +    executeTest(this::testCancelWhileSubmittedAndRunning);
 +  }
 +
 +  protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileSubmittedAndRunning"),
 +          false, "Test Op");
 +      Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId));
 +      // This is false because the transaction runner has reserved the FaTe
 +      // transaction.
 +      assertFalse(fate.cancel(fateId));
 +      callStarted.await();
 +      finishCall.countDown();
 +      Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId));
 +      fate.delete(fateId);
 +      assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileInCall() throws Exception {
 +    executeTest(this::testCancelWhileInCall);
 +  }
 +
 +  protected void testCancelWhileInCall(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileInCall with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileInCall"), true,
 +          "Test Op");
 +      assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
 +      // wait for call() to be called
 +      callStarted.await();
 +      // cancel the transaction
 +      assertFalse(fate.cancel(fateId));
 +      finishCall.countDown();
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testDeferredOverflow() throws Exception {
 +    // Set a maximum deferred map size of 10 transactions so that when the 
11th
 +    // is seen the Fate store should clear the deferred map and mark
 +    // the flag as overflow so that all the deferred transactions will be run
 +    executeTest(this::testDeferredOverflow, 10, 
AbstractFateStore.DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext 
sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      DeferredTestRepo.executedCalls.set(0);
 +      // Initialize the repo to have a delay of 30 seconds
 +      // so it will be deferred when submitted
 +      DeferredTestRepo.delay.set(30000);
 +
 +      Set<FateId> transactions = new HashSet<>();
 +
 +      // Start by creating 10 transactions that are all deferred which should
 +      // fill up the deferred map with all 10 as we set the max deferred limit
 +      // to only allow 10 transactions
 +      for (int i = 0; i < 10; i++) {
 +        submitDeferred(fate, sctx, transactions);
 +      }
 +
 +      // Verify all 10 are deferred in the map and each will
 +      // We should not be in an overflow state yet
 +      Wait.waitFor(() -> store.getDeferredCount() == 10);
 +      assertFalse(store.isDeferredOverflow());
 +
 +      // After verifying all 10 are deferred, submit another 10
 +      // which should trigger an overflow. We are blocking in the
 +      // call method of DeferredTestRepo at this point using a countdown
 +      // latch to prevent fate executor from running early and clearing
 +      // the deferred overflow flag before we can check it below
 +      for (int i = 0; i < 10; i++) {
 +        submitDeferred(fate, sctx, transactions);
 +      }
 +      // Verify deferred overflow is true and map is now empty
 +      Wait.waitFor(() -> store.getDeferredCount() == 0);
 +      Wait.waitFor(store::isDeferredOverflow);
 +
 +      // Set the delay to 0 and countdown so we will process the
 +      // call method in the repos. We need to change the delay because
 +      // due to the async nature of Fate it's possible some of the submitted
 +      // repos previously wouldn't be processed in the first batch until
 +      // after the flag was cleared which would trigger a long delay again
 +      DeferredTestRepo.delay.set(0);
 +      DeferredTestRepo.callLatch.countDown();
 +
 +      // Verify the flag was cleared and everything ran
 +      Wait.waitFor(() -> !store.isDeferredOverflow());
 +      Wait.waitFor(() -> DeferredTestRepo.executedCalls.get() == 20);
 +
 +      // Verify all 20 unique transactions finished
 +      Wait.waitFor(() -> {
 +        transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN);
 +        return transactions.isEmpty();
 +      });
 +
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testRepoFails() throws Exception {
 +    // Set a maximum deferred map size of 10 transactions so that when the 
11th
 +    // is seen the Fate store should clear the deferred map and mark
 +    // the flag as overflow so that all the deferred transactions will be run
 +    executeTest(this::testRepoFails, 10, 
AbstractFateStore.DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  protected void testRepoFails(FateStore<TestEnv> store, ServerContext sctx) 
throws Exception {
 +    /*
 +     * This test ensures that when an exception occurs in a Repo's call() or 
isReady() methods, that
 +     * undo() will be called back up the chain of Repo's and in the correct 
order. The test works as
 +     * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and 
returns Repo3, 3) Repo3
 +     * is called and throws an exception (in call() or isReady()). It is then 
expected that: 1)
 +     * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is 
called on Repo1
 +     */
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1");
 +      /*
 +       * Test exception in call()
 +       */
 +      TestOperationFails.undoOrder = new ArrayList<>();
 +      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
 +      FateId fateId = fate.startTransaction();
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, 
ExceptionLocation.CALL),
 +          false, "Test Op Fails");
 +      // Wait for all the undo() calls to complete
 +      undoLatch.await();
 +      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
 +      assertEquals(FAILED, fate.waitForCompletion(fateId));
 +      assertTrue(fate.getException(fateId).getMessage().contains("call() 
failed"));
 +      /*
 +       * Test exception in isReady()
 +       */
 +      TestOperationFails.undoOrder = new ArrayList<>();
 +      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
 +      fateId = fate.startTransaction();
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId,
 +          new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test 
Op Fails");
 +      // Wait for all the undo() calls to complete
 +      undoLatch.await();
 +      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
 +      assertEquals(FAILED, fate.waitForCompletion(fateId));
 +      assertTrue(fate.getException(fateId).getMessage().contains("isReady() 
failed"));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testNoWriteAfterDelete() throws Exception {
 +    executeTest(this::testNoWriteAfterDelete);
 +  }
 +
 +  protected void testNoWriteAfterDelete(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    final FateId fateId = store.create();
 +    final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete");
 +
 +    var txStore = store.reserve(fateId);
 +
 +    // all write ops should be ok after reservation
 +    assertDoesNotThrow(() -> txStore.push(repo));
 +    assertDoesNotThrow(() -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
 +    assertDoesNotThrow(txStore::pop);
 +    assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, 
TEST_FATE_OP));
 +    assertDoesNotThrow(txStore::delete);
 +
 +    // test that all write ops result in an exception since the tx has been 
deleted
 +    assertThrows(Exception.class, () -> txStore.push(repo));
 +    assertThrows(Exception.class, () -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
 +    assertThrows(Exception.class, txStore::pop);
 +    assertThrows(Exception.class,
 +        () -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP));
 +    assertThrows(Exception.class, txStore::delete);
 +  }
 +
 +  private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, 
Set<FateId> transactions) {
 +    FateId fateId = fate.startTransaction();
 +    transactions.add(fateId);
 +    assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
 +    fate.seedTransaction(TEST_FATE_OP, fateId, new 
DeferredTestRepo("testDeferredOverflow"), true,
 +        "Test Op");
 +    assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
 +  }
 +
 +  protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
 +    return new Fate<>(new TestEnv(), store, false, r -> r + "",
-         FateTestUtil.createTestFateConfig(1));
++        FateTestUtil.createTestFateConfig(1), new 
ScheduledThreadPoolExecutor(2));
 +  }
 +
 +  protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId);
 +
 +  private static void inCall() throws InterruptedException {
 +    // signal that call started
 +    callStarted.countDown();
 +    // wait for the signal to exit the method
 +    finishCall.await();
 +  }
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index f887fd4062,0000000000..86c2f3e3a9
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@@ -1,881 -1,0 +1,882 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP;
 +import static org.easymock.EasyMock.expect;
 +import static org.easymock.EasyMock.replay;
 +import static org.easymock.EasyMock.verify;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertNotEquals;
 +import static org.junit.jupiter.api.Assertions.assertNotNull;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.junit.jupiter.api.Assertions.fail;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Method;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.fate.AbstractFateStore;
 +import org.apache.accumulo.core.fate.AdminUtil;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 +import org.apache.accumulo.core.fate.user.UserFateStore;
 +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
 +import org.apache.accumulo.core.zookeeper.ZooSession;
 +import org.apache.accumulo.minicluster.ServerType;
 +import 
org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.util.Admin;
 +import org.apache.accumulo.server.util.fateCommand.FateSummaryReport;
 +import org.apache.accumulo.server.util.fateCommand.FateTxnDetails;
 +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv;
 +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestRepo;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.accumulo.test.functional.ReadWriteIT;
 +import org.apache.accumulo.test.functional.SlowIterator;
 +import org.apache.accumulo.test.util.Wait;
 +import org.easymock.EasyMock;
 +import org.junit.jupiter.api.BeforeEach;
 +import org.junit.jupiter.api.Test;
 +
 +public abstract class FateOpsCommandsIT extends ConfigurableMacBase
 +    implements FateTestRunner<LatchTestEnv> {
 +
 +  @Override
 +  protected Duration defaultTimeout() {
 +    return Duration.ofMinutes(3);
 +  }
 +
 +  @BeforeEach
 +  public void beforeEachSetup() throws Exception {
 +    // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION 
transaction which was
 +    // initiated on starting the manager, causing the test to fail. Stopping 
the compactor fixes
 +    // this issue.
 +    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
 +    Wait.waitFor(() -> getServerContext().getServerPaths()
 +        .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(), 
60_000);
 +  }
 +
 +  @Test
 +  public void testFateSummaryCommand() throws Exception {
 +    executeTest(this::testFateSummaryCommand);
 +  }
 +
 +  protected void testFateSummaryCommand(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // validate blank report, no transactions have started
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    FateSummaryReport report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertTrue(report.getStatusCounts().isEmpty());
 +    assertTrue(report.getStepCounts().isEmpty());
 +    assertTrue(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    validateFateDetails(report.getFateDetails(), 0, null);
 +
 +    // create Fate transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +    List<String> fateIdsStarted = List.of(fateId1.canonical(), 
fateId2.canonical());
 +
 +    // validate no filters
 +    p = getCluster().exec(Admin.class, "fate", "--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
 +
 +    /*
 +     * Test filtering by FateIds
 +     */
 +
 +    // validate filtering by both transactions
 +    p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), 
fateId2.canonical(),
 +        "--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertEquals(2, report.getFateIdFilter().size());
 +    assertTrue(report.getFateIdFilter().containsAll(fateIdsStarted));
 +    validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
 +
 +    // validate filtering by just one transaction
 +    p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), 
"--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertEquals(1, report.getFateIdFilter().size());
 +    assertTrue(report.getFateIdFilter().contains(fateId1.canonical()));
 +    validateFateDetails(report.getFateDetails(), 1, fateIdsStarted);
 +
 +    // validate filtering by non-existent transaction
 +    FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
 +    p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), 
"--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertEquals(1, report.getFateIdFilter().size());
 +    assertTrue(report.getFateIdFilter().contains(fakeFateId.canonical()));
 +    validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
 +
 +    /*
 +     * Test filtering by States
 +     */
 +
 +    // validate status filter by including only FAILED transactions, should 
be none
 +    p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", 
"FAILED");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertEquals(Set.of("FAILED"), report.getStatusFilterNames());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
 +
 +    // validate status filter by including only NEW transactions, should be 2
 +    p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", 
"NEW");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertEquals(Set.of("NEW"), report.getStatusFilterNames());
 +    assertTrue(report.getInstanceTypesFilterNames().isEmpty());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
 +
 +    /*
 +     * Test filtering by FateInstanceType
 +     */
 +
 +    // validate FateInstanceType filter by only including transactions with 
META filter
 +    p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", 
"META");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertEquals(Set.of("META"), report.getInstanceTypesFilterNames());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    if (store.type() == FateInstanceType.META) {
 +      validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
 +    } else { // USER
 +      validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
 +    }
 +
 +    // validate FateInstanceType filter by only including transactions with 
USER filter
 +    p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", 
"USER");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    assertNotEquals(0, report.getReportTime());
 +    assertFalse(report.getStatusCounts().isEmpty());
 +    assertFalse(report.getStepCounts().isEmpty());
 +    assertFalse(report.getCmdCounts().isEmpty());
 +    assertTrue(report.getStatusFilterNames().isEmpty());
 +    assertEquals(Set.of("USER"), report.getInstanceTypesFilterNames());
 +    assertTrue(report.getFateIdFilter().isEmpty());
 +    if (store.type() == FateInstanceType.META) {
 +      validateFateDetails(report.getFateDetails(), 0, fateIdsStarted);
 +    } else { // USER
 +      validateFateDetails(report.getFateDetails(), 2, fateIdsStarted);
 +    }
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFateSummaryCommandPlainText() throws Exception {
 +    executeTest(this::testFateSummaryCommandPlainText);
 +  }
 +
 +  protected void testFateSummaryCommandPlainText(FateStore<LatchTestEnv> 
store, ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), fateId2.canonical(),
 +        "--summary", "-s", "NEW", "-t", store.type().name());
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +    assertTrue(result.contains("Status Filters: [NEW]"));
 +    assertTrue(result
 +        .contains("Fate ID Filters: [" + fateId1.canonical() + ", " + 
fateId2.canonical() + "]")
 +        || result.contains(
 +            "Fate ID Filters: [" + fateId2.canonical() + ", " + 
fateId1.canonical() + "]"));
 +    assertTrue(result.contains("Instance Types Filters: [" + 
store.type().name() + "]"));
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFatePrintCommand() throws Exception {
 +    executeTest(this::testFatePrintCommand);
 +  }
 +
 +  protected void testFatePrintCommand(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // validate no transactions
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +    assertTrue(result.contains(" 0 transactions"));
 +
 +    // create Fate transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Get all transactions. Should be 2 FateIds with a NEW status
 +    p = getCluster().exec(Admin.class, "fate", "--print");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    Map<String,String> fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"), fateIdsFromResult);
 +
 +    /*
 +     * Test filtering by States
 +     */
 +
 +    // Filter by NEW state
 +    p = getCluster().exec(Admin.class, "fate", "--print", "-s", "NEW");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"), fateIdsFromResult);
 +
 +    // Filter by FAILED state
 +    p = getCluster().exec(Admin.class, "fate", "--print", "-s", "FAILED");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertTrue(fateIdsFromResult.isEmpty());
 +
 +    /*
 +     * Test filtering by FateIds
 +     */
 +
 +    // Filter by one FateId
 +    p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), 
"--print");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertEquals(Map.of(fateId1.canonical(), "NEW"), fateIdsFromResult);
 +
 +    // Filter by both FateIds
 +    p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), 
fateId2.canonical(), "--print");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"), fateIdsFromResult);
 +
 +    // Filter by non-existent FateId
 +    FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
 +    p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), 
"--print");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    assertEquals(0, fateIdsFromResult.size());
 +
 +    /*
 +     * Test filtering by FateInstanceType
 +     */
 +
 +    // Test filter by USER FateInstanceType
 +    p = getCluster().exec(Admin.class, "fate", "--print", "-t", "USER");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    if (store.type() == FateInstanceType.META) {
 +      assertTrue(fateIdsFromResult.isEmpty());
 +    } else { // USER
 +      assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +          fateIdsFromResult);
 +    }
 +
 +    // Test filter by META FateInstanceType
 +    p = getCluster().exec(Admin.class, "fate", "--print", "-t", "META");
 +    assertEquals(0, p.getProcess().waitFor());
 +    result = p.readStdOut();
 +    fateIdsFromResult = getFateIdsFromPrint(result);
 +    if (store.type() == FateInstanceType.META) {
 +      assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +          fateIdsFromResult);
 +    } else { // USER
 +      assertTrue(fateIdsFromResult.isEmpty());
 +    }
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testTransactionNameAndStep() throws Exception {
 +    executeTest(this::testTransactionNameAndStep);
 +  }
 +
 +  protected void testTransactionNameAndStep(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Since the other tests just use NEW transactions for simplicity, there 
are some fields of the
 +    // summary and print outputs which are null and not tested for 
(transaction name and transaction
 +    // step). This test uses seeded/in progress transactions to test that the 
summary and print
 +    // commands properly output these fields.
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
 +      final String table = getUniqueNames(1)[0];
 +
 +      IteratorSetting is = new IteratorSetting(1, SlowIterator.class);
 +      is.addOption("sleepTime", "10000");
 +
 +      NewTableConfiguration cfg = new NewTableConfiguration();
 +      cfg.attachIterator(is, EnumSet.of(IteratorUtil.IteratorScope.majc));
 +      client.tableOperations().create(table, cfg);
 +
 +      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 +      client.tableOperations().flush(table, null, null, true);
 +
 +      // create 2 Fate transactions
 +      client.tableOperations().compact(table, null, null, false, false);
 +      client.tableOperations().compact(table, null, null, false, false);
 +      List<String> fateIdsStarted = new ArrayList<>();
 +
 +      ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", 
"-j");
 +      assertEquals(0, p.getProcess().waitFor());
 +
 +      String result = p.readStdOut();
 +      result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +          .collect(Collectors.joining("\n"));
 +      FateSummaryReport report = FateSummaryReport.fromJson(result);
 +
 +      // Validate transaction name and transaction step from summary command
 +
 +      for (FateTxnDetails d : report.getFateDetails()) {
 +        assertEquals("TABLE_COMPACT", d.getFateOp());
 +        assertEquals("CompactionDriver", d.getStep());
 +        fateIdsStarted.add(d.getFateId());
 +      }
 +      assertEquals(2, fateIdsStarted.size());
 +
 +      p = getCluster().exec(Admin.class, "fate", "--print");
 +      assertEquals(0, p.getProcess().waitFor());
 +      result = p.readStdOut();
 +
 +      // Validate transaction name and transaction step from print command
 +
 +      String[] lines = result.split("\n");
 +      // Filter out the result to just include the info about the transactions
 +      List<String> transactionInfo = Arrays.stream(lines)
 +          .filter(
 +              line -> line.contains(fateIdsStarted.get(0)) || 
line.contains(fateIdsStarted.get(1)))
 +          .collect(Collectors.toList());
 +      assertEquals(2, transactionInfo.size());
 +      for (String info : transactionInfo) {
 +        assertTrue(info.contains("TABLE_COMPACT"));
 +        assertTrue(info.contains("op: CompactionDriver"));
 +      }
 +
 +      client.tableOperations().delete(table);
 +    }
 +  }
 +
 +  @Test
 +  public void testFateCancelCommand() throws Exception {
 +    executeTest(this::testFateCancelCommand);
 +  }
 +
 +  protected void testFateCancelCommand(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Check that summary output lists both the transactions with a NEW status
 +    Map<String,String> fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    // Cancel the first transaction and ensure that it was cancelled
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), "--cancel");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +
 +    assertTrue(result
 +        .contains("transaction " + fateId1.canonical() + " was cancelled or 
already completed"));
 +    fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFateFailCommandTimeout() throws Exception {
 +    stopManagerAndExecuteTest(this::testFateFailCommandTimeout);
 +  }
 +
 +  protected void testFateFailCommandTimeout(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    LatchTestEnv env = new LatchTestEnv();
 +    FastFate<LatchTestEnv> fate = initFateWithDeadResCleaner(store, env);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Check that summary output lists both the transactions with a NEW status
 +    Map<String,String> fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    // Seed the transaction with the latch repo, so we can have an 
IN_PROGRESS transaction
 +    fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, 
"test");
 +    // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1)
 +    Wait.waitFor(() -> env.numWorkers.get() == 1);
 +
 +    // Try to fail fateId1
 +    // This should not work as it is already reserved and being worked on by 
our running FATE
 +    // ('fate'). Admin should try to reserve it for a bit, but should fail 
and exit
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), "--fail");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +
 +    assertTrue(result.contains("Could not fail " + fateId1 + " in a 
reasonable time"));
 +    fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", 
fateId2.canonical(), "NEW"),
 +        fateIdsFromSummary);
 +
 +    // Finish work and shutdown
 +    env.workersLatch.countDown();
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFateFailCommandSuccess() throws Exception {
 +    executeTest(this::testFateFailCommandSuccess);
 +  }
 +
 +  protected void testFateFailCommandSuccess(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Check that summary output lists both the transactions with a NEW status
 +    Map<String,String> fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    // Try to fail fateId1
 +    // This should work since nothing has fateId1 reserved (it is NEW)
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), "--fail");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +
 +    assertTrue(result.contains("Failing transaction: " + fateId1));
 +    fateIdsFromSummary = getFateIdsFromSummary();
 +    assertTrue(fateIdsFromSummary
 +        .equals(Map.of(fateId1.canonical(), "FAILED_IN_PROGRESS", 
fateId2.canonical(), "NEW"))
 +        || fateIdsFromSummary
 +            .equals(Map.of(fateId1.canonical(), "FAILED", 
fateId2.canonical(), "NEW")));
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFateDeleteCommandTimeout() throws Exception {
 +    stopManagerAndExecuteTest(this::testFateDeleteCommandTimeout);
 +  }
 +
 +  protected void testFateDeleteCommandTimeout(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    LatchTestEnv env = new LatchTestEnv();
 +    FastFate<LatchTestEnv> fate = initFateWithDeadResCleaner(store, env);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Check that summary output lists both the transactions with a NEW status
 +    Map<String,String> fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    // Seed the transaction with the latch repo, so we can have an 
IN_PROGRESS transaction
 +    fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, 
"test");
 +    // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1)
 +    Wait.waitFor(() -> env.numWorkers.get() == 1);
 +
 +    // Try to delete fateId1
 +    // This should not work as it is already reserved and being worked on by 
our running FATE
 +    // ('fate'). Admin should try to reserve it for a bit, but should fail 
and exit
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), "--delete");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +
 +    assertTrue(result.contains("Could not delete " + fateId1 + " in a 
reasonable time"));
 +    fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", 
fateId2.canonical(), "NEW"),
 +        fateIdsFromSummary);
 +
 +    // Finish work and shutdown
 +    env.workersLatch.countDown();
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFateDeleteCommandSuccess() throws Exception {
 +    executeTest(this::testFateDeleteCommandSuccess);
 +  }
 +
 +  protected void testFateDeleteCommandSuccess(FateStore<LatchTestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    // Configure Fate
 +    Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store);
 +
 +    // Start some transactions
 +    FateId fateId1 = fate.startTransaction();
 +    FateId fateId2 = fate.startTransaction();
 +
 +    // Check that summary output lists both the transactions with a NEW status
 +    Map<String,String> fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), 
"NEW"),
 +        fateIdsFromSummary);
 +
 +    // Try to delete fateId1
 +    // This should work since nothing has fateId1 reserved (it is NEW)
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", 
fateId1.canonical(), "--delete");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +
 +    assertTrue(result.contains("Deleting transaction: " + fateId1));
 +    fateIdsFromSummary = getFateIdsFromSummary();
 +    assertEquals(Map.of(fateId2.canonical(), "NEW"), fateIdsFromSummary);
 +
 +    fate.shutdown(1, TimeUnit.MINUTES);
 +  }
 +
 +  @Test
 +  public void testFatePrintAndSummaryCommandsWithInProgressTxns() throws 
Exception {
 +    executeTest(this::testFatePrintAndSummaryCommandsWithInProgressTxns);
 +  }
 +
 +  protected void 
testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore<LatchTestEnv> store,
 +      ServerContext sctx) throws Exception {
 +    // This test was written for an issue with the 'admin fate --print' and 
'admin fate --summary'
 +    // commands where transactions could complete mid-print causing the 
command to fail. These
 +    // commands first get a list of the transactions and then probe for info 
on the transactions.
 +    // If a transaction completed between getting the list and probing for 
info on that
 +    // transaction, the command would fail. This test ensures that this 
problem has been fixed
 +    // (if the tx no longer exists, it should just be ignored so the 
print/summary can complete).
 +    FateStore<LatchTestEnv> mockedStore;
 +
 +    // This error was occurring in AdminUtil.getTransactionStatus(), so we 
will test this method.
 +    if (store.type().equals(FateInstanceType.USER)) {
 +      Method listMethod = UserFateStore.class.getMethod("list");
 +      mockedStore = EasyMock.createMockBuilder(UserFateStore.class)
 +          .withConstructor(ClientContext.class, String.class, 
ZooUtil.LockID.class, Predicate.class)
 +          .withArgs(sctx, AccumuloTable.FATE.tableName(), null, 
null).addMockedMethod(listMethod)
 +          .createMock();
 +    } else {
 +      Method listMethod = MetaFateStore.class.getMethod("list");
 +      mockedStore = EasyMock.createMockBuilder(MetaFateStore.class)
 +          .withConstructor(ZooSession.class, ZooUtil.LockID.class, 
Predicate.class)
 +          .withArgs(sctx.getZooSession(), null, 
null).addMockedMethod(listMethod).createMock();
 +    }
 +
 +    // 3 FateIds, two that exist and one that does not. We are simulating 
that a transaction that
 +    // doesn't exist is accessed in getTransactionStatus() and ensuring that 
this doesn't cause
 +    // the method to fail or have any unexpected behavior.
 +    FateId tx1 = store.create();
 +    FateId tx2 = FateId.from(store.type(), UUID.randomUUID());
 +    FateId tx3 = store.create();
 +
 +    List<ReadOnlyFateStore.FateIdStatus> fateIdStatusList =
 +        List.of(createFateIdStatus(tx1), createFateIdStatus(tx2), 
createFateIdStatus(tx3));
 +    expect(mockedStore.list()).andReturn(fateIdStatusList.stream()).once();
 +
 +    replay(mockedStore);
 +
 +    AdminUtil.FateStatus status = null;
 +    try {
 +      status = AdminUtil.getTransactionStatus(Map.of(store.type(), 
mockedStore), null, null, null,
 +          new HashMap<>(), new HashMap<>());
 +    } catch (Exception e) {
 +      fail("An unexpected error occurred in getTransactionStatus():\n" + e);
 +    }
 +
 +    verify(mockedStore);
 +
 +    assertNotNull(status);
 +    // All three should be returned
 +    assertEquals(3, status.getTransactions().size());
 +    
assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getFateId)
 +        .collect(Collectors.toList()), List.of(tx1, tx2, tx3));
 +    // The two real FateIds should have NEW status and the fake one should be 
UNKNOWN
 +    assertEquals(
 +        
status.getTransactions().stream().map(AdminUtil.TransactionStatus::getStatus)
 +            .collect(Collectors.toList()),
 +        List.of(ReadOnlyFateStore.TStatus.NEW, 
ReadOnlyFateStore.TStatus.UNKNOWN,
 +            ReadOnlyFateStore.TStatus.NEW));
 +    // None of them should have a name since none of them were seeded with 
work
 +    
assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getFateOp)
 +        .collect(Collectors.toList()), Arrays.asList(null, null, null));
 +    // None of them should have a Repo since none of them were seeded with 
work
 +    
assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getTop)
 +        .collect(Collectors.toList()), Arrays.asList(null, null, null));
 +    // The FateId that doesn't exist should have a creation time of 0, the 
others should not
 +    List<Long> timeCreated = status.getTransactions().stream()
 +        
.map(AdminUtil.TransactionStatus::getTimeCreated).collect(Collectors.toList());
 +    assertNotEquals(timeCreated.get(0), 0);
 +    assertEquals(timeCreated.get(1), 0);
 +    assertNotEquals(timeCreated.get(2), 0);
 +    // All should have the store.type() type
 +    
assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getInstanceType)
 +        .collect(Collectors.toList()), List.of(store.type(), store.type(), 
store.type()));
 +  }
 +
 +  private ReadOnlyFateStore.FateIdStatus createFateIdStatus(FateId fateId) {
 +    // We are only using the fateId from this, so null/empty is fine for the 
rest
 +    return new AbstractFateStore.FateIdStatusBase(fateId) {
 +      @Override
 +      public ReadOnlyFateStore.TStatus getStatus() {
 +        return null;
 +      }
 +
 +      @Override
 +      public Optional<FateStore.FateReservation> getFateReservation() {
 +        return Optional.empty();
 +      }
 +
 +      @Override
 +      public Optional<Fate.FateOperation> getFateOperation() {
 +        return Optional.empty();
 +      }
 +    };
 +  }
 +
 +  /**
 +   *
 +   * @param printResult the output of the --print fate command
 +   * @return a map of each of the FateIds to their status using the output of 
--print
 +   */
 +  private Map<String,String> getFateIdsFromPrint(String printResult) {
 +    Map<String,String> fateIdToStatus = new HashMap<>();
 +    String lastFateIdSeen = null;
 +    String[] words = printResult.split(" ");
 +    for (String word : words) {
 +      if (FateId.isFateId(word)) {
 +        if (!fateIdToStatus.containsKey(word)) {
 +          lastFateIdSeen = word;
 +        } else {
 +          log.debug(
 +              "--print listed the same transaction more than once. This 
should not occur, failing");
 +          fail();
 +        }
 +      } else if (wordIsTStatus(word)) {
 +        fateIdToStatus.put(lastFateIdSeen, word);
 +      }
 +    }
 +    return fateIdToStatus;
 +  }
 +
 +  /**
 +   *
 +   * @return a map of each of the FateIds to their status using the --summary 
command
 +   */
 +  private Map<String,String> getFateIdsFromSummary() throws Exception {
 +    ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j");
 +    assertEquals(0, p.getProcess().waitFor());
 +    String result = p.readStdOut();
 +    result = result.lines().filter(line -> 
!line.matches(".*(INFO|DEBUG|WARN|ERROR).*"))
 +        .collect(Collectors.joining("\n"));
 +    FateSummaryReport report = FateSummaryReport.fromJson(result);
 +    assertNotNull(report);
 +    Map<String,String> fateIdToStatus = new HashMap<>();
 +    report.getFateDetails().forEach((d) -> {
 +      fateIdToStatus.put(d.getFateId(), d.getStatus());
 +    });
 +    return fateIdToStatus;
 +  }
 +
 +  /**
 +   * Validates the fate details of NEW transactions
 +   *
 +   * @param details the fate details from the {@link FateSummaryReport}
 +   * @param expDetailsSize the expected size of details
 +   * @param fateIdsStarted the list of fate ids that have been started
 +   */
 +  private void validateFateDetails(Set<FateTxnDetails> details, int 
expDetailsSize,
 +      List<String> fateIdsStarted) {
 +    assertEquals(expDetailsSize, details.size());
 +    for (FateTxnDetails d : details) {
 +      assertTrue(fateIdsStarted.contains(d.getFateId()));
 +      assertEquals("NEW", d.getStatus());
 +      assertEquals("?", d.getStep());
 +      assertEquals("?", d.getFateOp());
 +      assertNotEquals(0, d.getRunning());
 +      assertEquals("[]", d.getLocksHeld().toString());
 +      assertEquals("[]", d.getLocksWaiting().toString());
 +    }
 +  }
 +
 +  protected FastFate<LatchTestEnv> 
initFateWithDeadResCleaner(FateStore<LatchTestEnv> store,
 +      LatchTestEnv env) {
 +    // Using FastFate so the cleanup will run often. This ensures that the 
cleanup will run when
 +    // there are reservations present and that the cleanup will not 
unexpectedly delete these live
 +    // reservations
 +    return new FastFate<>(env, store, true, Object::toString, 
DefaultConfiguration.getInstance());
 +  }
 +
 +  protected Fate<LatchTestEnv> 
initFateNoDeadResCleaner(FateStore<LatchTestEnv> store) {
 +    return new Fate<>(new LatchTestEnv(), store, false, Object::toString,
-         DefaultConfiguration.getInstance());
++        DefaultConfiguration.getInstance(), new 
ScheduledThreadPoolExecutor(2));
 +  }
 +
 +  private boolean wordIsTStatus(String word) {
 +    try {
 +      ReadOnlyFateStore.TStatus.valueOf(word);
 +    } catch (IllegalArgumentException e) {
 +      return false;
 +    }
 +    return true;
 +  }
 +
 +  /**
 +   * Stop the MANAGER. For some of our tests, we want to be able to seed 
transactions with our own
 +   * test repos. We want our fate to reserve these transactions (and not the 
real fates running in
 +   * the Manager as that will lead to exceptions since the real fates 
wouldn't be able to handle our
 +   * test repos). So, we essentially have the fates created here acting as 
the real fates: they have
 +   * the same threads running that the real fates would, use a fate store 
with a ZK lock, use the
 +   * same locations to store fate data that the Manager does, and are running 
in a separate process
 +   * from the Admin process. Note that we cannot simply use different 
locations for our fate data
 +   * from Manager to keep our test env separate from Manager. Admin uses the 
real fate data
 +   * locations, so our test must also use the real locations.
 +   */
 +  protected void stopManager() throws IOException {
 +    getCluster().getClusterControl().stopAllServers(ServerType.MANAGER);
 +  }
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index 7ff60e5bed,424248b590..15e429d581
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@@ -18,15 -18,11 +18,16 @@@
   */
  package org.apache.accumulo.test.fate;
  
 +import java.util.Set;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.function.Function;
  
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateExecutor;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
  import org.apache.accumulo.core.fate.Repo;
 -import org.apache.accumulo.core.fate.TStore;
  
  import com.google.common.base.Preconditions;
  
@@@ -35,9 -31,8 +36,9 @@@
   */
  public class FlakyFate<T> extends Fate<T> {
  
 -  public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> 
toLogStrFunc) {
 -    super(environment, store, toLogStrFunc);
 +  public FlakyFate(T environment, FateStore<T> store, 
Function<Repo<T>,String> toLogStrFunc,
 +      AccumuloConfiguration conf) {
-     super(environment, store, false, toLogStrFunc, conf);
++    super(environment, store, false, toLogStrFunc, conf, new 
ScheduledThreadPoolExecutor(2));
    }
  
    @Override
diff --cc test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index 472f9263c9,0000000000..0637b520a2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@@ -1,475 -1,0 +1,477 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Predicate;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.test.util.Wait;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.jupiter.api.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Sets;
 +
 +public abstract class MultipleStoresIT extends SharedMiniClusterBase {
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(MultipleStoresIT.class);
 +
 +  @Test
 +  public void testReserveUnreserve() throws Exception {
 +    executeSleepingEnvTest(this::testReserveUnreserve);
 +  }
 +
 +  private void testReserveUnreserve(TestStoreFactory<SleepingTestEnv> 
testStoreFactory)
 +      throws Exception {
 +    // reserving/unreserving a FateId should be reflected across instances of 
the stores
 +    final int numFateIds = 500;
 +    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
 +    final Set<FateId> allIds = new HashSet<>();
 +    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
 +    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
 +    Map<FateId,FateStore.FateReservation> activeReservations;
 +    final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, 
null);
 +    final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, 
null);
 +    final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID());
 +
 +    // Create the fate ids using store1
 +    for (int i = 0; i < numFateIds; i++) {
 +      assertTrue(allIds.add(store1.create()));
 +    }
 +    assertEquals(numFateIds, allIds.size());
 +
 +    // Reserve half the fate ids using store1 and rest using store2, after 
reserving a fate id in
 +    // one, should not be able to reserve the same in the other. Should also 
not matter that all the
 +    // ids were created using store1
 +    int count = 0;
 +    for (FateId fateId : allIds) {
 +      if (count % 2 == 0) {
 +        reservations.add(store1.reserve(fateId));
 +        assertTrue(store2.tryReserve(fateId).isEmpty());
 +      } else {
 +        reservations.add(store2.reserve(fateId));
 +        assertTrue(store1.tryReserve(fateId).isEmpty());
 +      }
 +      count++;
 +    }
 +    // Try to reserve a non-existent fate id
 +    assertTrue(store1.tryReserve(fakeFateId).isEmpty());
 +    assertTrue(store2.tryReserve(fakeFateId).isEmpty());
 +    // Both stores should return the same reserved transactions
 +    activeReservations = store1.getActiveReservations();
 +    assertEquals(allIds, activeReservations.keySet());
 +    activeReservations = store2.getActiveReservations();
 +    assertEquals(allIds, activeReservations.keySet());
 +
 +    // Test setting/getting the TStatus and unreserving the transactions
 +    for (int i = 0; i < allIds.size(); i++) {
 +      var reservation = reservations.get(i);
 +      assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
 +      reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
 +      assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, 
reservation.getStatus());
 +      reservation.delete();
 +      reservation.unreserve(Duration.ofMillis(0));
 +      // Attempt to set a status on a tx that has been unreserved (should 
throw exception)
 +      assertThrows(IllegalStateException.class,
 +          () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
 +    }
 +    assertTrue(store1.getActiveReservations().isEmpty());
 +    assertTrue(store2.getActiveReservations().isEmpty());
 +  }
 +
 +  @Test
 +  public void testReserveNonExistentTxn() throws Exception {
 +    executeSleepingEnvTest(this::testReserveNonExistentTxn);
 +  }
 +
 +  private void testReserveNonExistentTxn(TestStoreFactory<SleepingTestEnv> 
testStoreFactory)
 +      throws Exception {
 +    // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
 +    // on reserve() a non-existent transaction.
 +    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
 +    final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, 
null);
 +    final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
 +
 +    var err = assertThrows(IllegalStateException.class, () -> 
store.reserve(fakeFateId));
 +    assertTrue(err.getMessage().contains(fakeFateId.canonical()));
 +  }
 +
 +  @Test
 +  public void testReserveReservedAndUnreserveUnreserved() throws Exception {
 +    executeSleepingEnvTest(this::testReserveReservedAndUnreserveUnreserved);
 +  }
 +
 +  private void testReserveReservedAndUnreserveUnreserved(
 +      TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception {
 +    final int numFateIds = 500;
 +    final Set<FateId> allIds = new HashSet<>();
 +    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
 +    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
 +    final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, 
null);
 +
 +    // Create some FateIds and ensure that they can be reserved
 +    for (int i = 0; i < numFateIds; i++) {
 +      FateId fateId = store.create();
 +      assertTrue(allIds.add(fateId));
 +      var reservation = store.tryReserve(fateId);
 +      assertFalse(reservation.isEmpty());
 +      reservations.add(reservation.orElseThrow());
 +    }
 +    assertEquals(numFateIds, allIds.size());
 +
 +    // Try to reserve again, should not reserve
 +    for (FateId fateId : allIds) {
 +      assertTrue(store.tryReserve(fateId).isEmpty());
 +    }
 +
 +    // Unreserve all the FateIds
 +    for (var reservation : reservations) {
 +      reservation.unreserve(Duration.ofMillis(0));
 +    }
 +    // Try to unreserve again (should throw exception)
 +    for (var reservation : reservations) {
 +      assertThrows(IllegalStateException.class, () -> 
reservation.unreserve(Duration.ofMillis(0)));
 +    }
 +  }
 +
 +  @Test
 +  public void testReserveAfterUnreserveAndReserveAfterDeleted() throws 
Exception {
 +    
executeSleepingEnvTest(this::testReserveAfterUnreserveAndReserveAfterDeleted);
 +  }
 +
 +  private void testReserveAfterUnreserveAndReserveAfterDeleted(
 +      TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception {
 +    final int numFateIds = 500;
 +    final Set<FateId> allIds = new HashSet<>();
 +    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
 +    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
 +    final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, 
null);
 +
 +    // Create some FateIds and ensure that they can be reserved
 +    for (int i = 0; i < numFateIds; i++) {
 +      FateId fateId = store.create();
 +      assertTrue(allIds.add(fateId));
 +      var reservation = store.tryReserve(fateId);
 +      assertFalse(reservation.isEmpty());
 +      reservations.add(reservation.orElseThrow());
 +    }
 +    assertEquals(numFateIds, allIds.size());
 +
 +    // Unreserve all
 +    for (var reservation : reservations) {
 +      reservation.unreserve(Duration.ofMillis(0));
 +    }
 +
 +    // Ensure they can be reserved again, and delete and unreserve this time
 +    for (FateId fateId : allIds) {
 +      // Verify that the tx status is still NEW after unreserving since it 
hasn't been deleted
 +      assertEquals(ReadOnlyFateStore.TStatus.NEW, 
store.read(fateId).getStatus());
 +      var reservation = store.tryReserve(fateId);
 +      assertFalse(reservation.isEmpty());
 +      reservation.orElseThrow().delete();
 +      reservation.orElseThrow().unreserve(Duration.ofMillis(0));
 +    }
 +
 +    for (FateId fateId : allIds) {
 +      // Verify that the tx is now unknown since it has been deleted
 +      assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, 
store.read(fateId).getStatus());
 +      // Attempt to reserve a deleted txn, should throw an exception and not 
wait indefinitely
 +      var err = assertThrows(IllegalStateException.class, () -> 
store.reserve(fateId));
 +      assertTrue(err.getMessage().contains(fateId.canonical()));
 +    }
 +  }
 +
 +  @Test
 +  public void testMultipleFateInstances() throws Exception {
 +    executeSleepingEnvTest(this::testMultipleFateInstances);
 +  }
 +
 +  private void testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> 
testStoreFactory)
 +      throws Exception {
 +    final int numFateIds = 500;
 +    final Set<FateId> allIds = new HashSet<>();
 +    final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
 +    final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
 +    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
 +    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
 +    final Set<ZooUtil.LockID> liveLocks = new HashSet<>();
 +    final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
 +    final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, 
isLockHeld);
 +    final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, 
isLockHeld);
 +
 +    liveLocks.add(lock1);
 +    liveLocks.add(lock2);
 +
-     Fate<SleepingTestEnv> fate1 =
-         new Fate<>(testEnv1, store1, true, Object::toString, 
DefaultConfiguration.getInstance());
-     Fate<SleepingTestEnv> fate2 =
-         new Fate<>(testEnv2, store2, false, Object::toString, 
DefaultConfiguration.getInstance());
++    Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true, 
Object::toString,
++        DefaultConfiguration.getInstance(), new 
ScheduledThreadPoolExecutor(2));
++    Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false, 
Object::toString,
++        DefaultConfiguration.getInstance(), new 
ScheduledThreadPoolExecutor(2));
 +
 +    try {
 +      for (int i = 0; i < numFateIds; i++) {
 +        FateId fateId;
 +        // Start half the txns using fate1, and the other half using fate2
 +        if (i % 2 == 0) {
 +          fateId = fate1.startTransaction();
 +          fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), 
true, "test");
 +        } else {
 +          fateId = fate2.startTransaction();
 +          fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), 
true, "test");
 +        }
 +        allIds.add(fateId);
 +      }
 +      assertEquals(numFateIds, allIds.size());
 +
 +      // Should be able to wait for completion on any fate instance
 +      for (FateId fateId : allIds) {
 +        fate2.waitForCompletion(fateId);
 +      }
 +      // Ensure that all txns have been executed and have only been executed 
once
 +      assertTrue(Collections.disjoint(testEnv1.executedOps, 
testEnv2.executedOps));
 +      assertEquals(allIds, Sets.union(testEnv1.executedOps, 
testEnv2.executedOps));
 +    } finally {
 +      fate1.shutdown(1, TimeUnit.MINUTES);
 +      fate2.shutdown(1, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testDeadReservationsCleanup() throws Exception {
 +    executeLatchEnvTest(this::testDeadReservationsCleanup);
 +  }
 +
 +  private void testDeadReservationsCleanup(TestStoreFactory<LatchTestEnv> 
testStoreFactory)
 +      throws Exception {
 +    // Tests reserving some transactions, then simulating that the Manager 
died by creating
 +    // a new Fate instance and store with a new LockID. The transactions 
which were
 +    // reserved using the old LockID should be cleaned up by Fate's 
DeadReservationCleaner,
 +    // then picked up by the new Fate/store.
 +
 +    // > 1 to have some concurrency to simulate realistic fate scenario
 +    final int numThreads = 10;
 +    // One transaction for each FATE worker thread
 +    final int numFateIds = numThreads;
 +    final Set<FateId> allIds = new HashSet<>();
 +    final LatchTestEnv testEnv1 = new LatchTestEnv();
 +    final LatchTestEnv testEnv2 = new LatchTestEnv();
 +    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
 +    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
 +    final Set<ZooUtil.LockID> liveLocks = new HashSet<>();
 +    final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
 +    final AccumuloConfiguration config = 
FateTestUtil.createTestFateConfig(numThreads);
 +    Map<FateId,FateStore.FateReservation> reservations;
 +
 +    final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1, 
isLockHeld);
 +    liveLocks.add(lock1);
 +    Fate<LatchTestEnv> fate1 = null;
 +    Fate<LatchTestEnv> fate2 = null;
 +
 +    try {
 +      fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, 
config);
 +      // Ensure nothing is reserved yet
 +      assertTrue(store1.getActiveReservations().isEmpty());
 +
 +      // Create transactions
 +      for (int i = 0; i < numFateIds; i++) {
 +        FateId fateId;
 +        fateId = fate1.startTransaction();
 +        fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), 
true, "test");
 +        allIds.add(fateId);
 +      }
 +      assertEquals(numFateIds, allIds.size());
 +
 +      // Wait for all the fate worker threads to start working on the 
transactions
 +      Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds);
 +      // Each fate worker will be hung up working (IN_PROGRESS) on a single 
transaction
 +
 +      // Verify store1 has the transactions reserved and that they were 
reserved with lock1
 +      reservations = store1.getActiveReservations();
 +      assertEquals(allIds, reservations.keySet());
 +      reservations.values().forEach(res -> assertEquals(lock1, 
res.getLockID()));
 +
 +      final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2, 
isLockHeld);
 +
 +      // Verify store2 can see the reserved transactions even though they 
were reserved using
 +      // store1
 +      reservations = store2.getActiveReservations();
 +      assertEquals(allIds, reservations.keySet());
 +      reservations.values().forEach(res -> assertEquals(lock1, 
res.getLockID()));
 +
 +      // Simulate what would happen if the Manager using the Fate object 
(fate1) died.
 +      // isLockHeld would return false for the LockId of the Manager that 
died (in this case, lock1)
 +      // and true for the new Manager's lock (lock2)
 +      liveLocks.remove(lock1);
 +      liveLocks.add(lock2);
 +
 +      // Create the new Fate/start the Fate threads (the work finder and the 
workers).
 +      // Don't run another dead reservation cleaner since we already have one 
running from fate1.
-       fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config);
++      fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config,
++          new ScheduledThreadPoolExecutor(2));
 +
 +      // Wait for the "dead" reservations to be deleted and picked up again 
(reserved using
 +      // fate2/store2/lock2 now).
 +      // They are considered "dead" if they are held by lock1 in this test. 
We don't have to worry
 +      // about fate1/store1/lock1 being used to reserve the transactions 
again since all
 +      // the workers for fate1 are hung up
 +      Wait.waitFor(() -> {
 +        Map<FateId,FateStore.FateReservation> store2Reservations = 
store2.getActiveReservations();
 +        boolean allReservedWithLock2 =
 +            store2Reservations.values().stream().allMatch(entry -> 
entry.getLockID().equals(lock2));
 +        return store2Reservations.keySet().equals(allIds) && 
allReservedWithLock2;
 +      }, fate1.getDeadResCleanupDelay().toMillis() * 2);
 +    } finally {
 +      // Finish work and shutdown
 +      testEnv1.workersLatch.countDown();
 +      testEnv2.workersLatch.countDown();
 +      if (fate1 != null) {
 +        fate1.shutdown(1, TimeUnit.MINUTES);
 +      }
 +      if (fate2 != null) {
 +        fate2.shutdown(1, TimeUnit.MINUTES);
 +      }
 +    }
 +  }
 +
 +  public static class SleepingTestRepo implements Repo<SleepingTestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public long isReady(FateId fateId, SleepingTestEnv environment) {
 +      return 0;
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return null;
 +    }
 +
 +    @Override
 +    public Repo<SleepingTestEnv> call(FateId fateId, SleepingTestEnv 
environment) throws Exception {
 +      environment.executedOps.add(fateId);
 +      LOG.debug("Thread " + Thread.currentThread() + " in 
SleepingTestRepo.call() sleeping for "
 +          + environment.sleepTimeMs + " millis");
 +      Thread.sleep(environment.sleepTimeMs); // Simulate some work
 +      LOG.debug("Thread " + Thread.currentThread() + " finished 
SleepingTestRepo.call()");
 +      return null;
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, SleepingTestEnv environment) {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return null;
 +    }
 +  }
 +
 +  public static class SleepingTestEnv extends MultipleStoresTestEnv {
 +    public final Set<FateId> executedOps = Collections.synchronizedSet(new 
HashSet<>());
 +    public final int sleepTimeMs;
 +
 +    public SleepingTestEnv(int sleepTimeMs) {
 +      this.sleepTimeMs = sleepTimeMs;
 +    }
 +  }
 +
 +  public static class LatchTestRepo implements Repo<LatchTestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public long isReady(FateId fateId, LatchTestEnv environment) {
 +      return 0;
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return null;
 +    }
 +
 +    @Override
 +    public Repo<LatchTestEnv> call(FateId fateId, LatchTestEnv environment) 
throws Exception {
 +      LOG.debug("Thread " + Thread.currentThread() + " in 
LatchTestRepo.call()");
 +      environment.numWorkers.incrementAndGet();
 +      environment.workersLatch.await();
 +      LOG.debug("Thread " + Thread.currentThread() + " finished 
LatchTestRepo.call()");
 +      environment.numWorkers.decrementAndGet();
 +      return null;
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, LatchTestEnv environment) {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return null;
 +    }
 +  }
 +
 +  public static class LatchTestEnv extends MultipleStoresTestEnv {
 +    public final AtomicInteger numWorkers = new AtomicInteger(0);
 +    public final CountDownLatch workersLatch = new CountDownLatch(1);
 +  }
 +
 +  protected abstract void executeSleepingEnvTest(
 +      MultipleStoresTestExecutor<SleepingTestEnv> testMethod) throws 
Exception;
 +
 +  protected abstract void 
executeLatchEnvTest(MultipleStoresTestExecutor<LatchTestEnv> testMethod)
 +      throws Exception;
 +
 +  protected interface TestStoreFactory<T extends MultipleStoresTestEnv> {
 +    FateStore<T> create(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> 
isLockHeld)
 +        throws InterruptedException, KeeperException;
 +  }
 +
 +  @FunctionalInterface
 +  protected interface MultipleStoresTestExecutor<T extends 
MultipleStoresTestEnv> {
 +    void execute(TestStoreFactory<T> fateStoreFactory) throws Exception;
 +  }
 +
 +  protected static class MultipleStoresTestEnv extends FateTestRunner.TestEnv 
{}
 +}

Reply via email to