This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e9de98b240ed724e38d9630fc0577bc044e4e581
Merge: 486cddb356 552c7a15b6
Author: Dave Marion <[email protected]>
AuthorDate: Thu Dec 18 22:06:04 2025 +0000

    Merge branch '2.1'

 .../java/org/apache/accumulo/core/fate/Fate.java   |  9 ++-
 .../apache/accumulo/core/fate/FateExecutor.java    | 62 ++++++++++++++++--
 .../org/apache/accumulo/test/fate/FateITBase.java  | 76 +++++++++++++++++++++-
 3 files changed, 136 insertions(+), 11 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index e928cbd154,684fd69cc0..651e1ca838
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -204,32 -178,38 +204,33 @@@ public class Fate<T> 
            }
          }
        }
 -    }
  
 -    /**
 -     * The Hadoop Filesystem registers a java shutdown hook that closes the 
file system. This can
 -     * cause threads to get spurious IOException. If this happens, instead of 
failing a FATE
 -     * transaction just wait for process to die. When the manager start 
elsewhere the FATE
 -     * transaction can resume.
 -     */
 -    private void blockIfHadoopShutdown(long tid, Exception e) {
 -      if (ShutdownUtil.isShutdownInProgress()) {
 -        String tidStr = FateTxId.formatTid(tid);
 -
 -        if (e instanceof AcceptableException) {
 -          log.debug("Ignoring exception possibly caused by Hadoop Shutdown 
hook. {} ", tidStr, e);
 -        } else if (isIOException(e)) {
 -          log.info("Ignoring exception likely caused by Hadoop Shutdown hook. 
{} ", tidStr, e);
 -        } else {
 -          // sometimes code will catch an IOException caused by the hadoop 
shutdown hook and throw
 -          // another exception without setting the cause.
 -          log.warn("Ignoring exception possibly caused by Hadoop Shutdown 
hook. {} ", tidStr, e);
 +      // replacement task: at this point, the existing FateExecutors that 
were invalidated by the
 +      // config changes have started shutdown or finished shutdown. Now 
create any new replacement
 +      // FateExecutors needed
 +      for (var poolConfig : poolConfigs.entrySet()) {
 +        Set<FateOperation> fateOps = poolConfig.getKey();
 +        Map.Entry<String,Integer> fateExecNameAndPoolSize = 
poolConfig.getValue();
 +        String fateExecutorName = fateExecNameAndPoolSize.getKey();
 +        int poolSize = fateExecNameAndPoolSize.getValue();
 +        synchronized (fateExecutors) {
 +          if (fateExecutors.stream().noneMatch(
 +              fe -> fe.getFateOps().equals(fateOps) && 
fe.getName().equals(fateExecutorName))) {
-             log.debug("[{}] Adding FateExecutor for {}", store.type(), 
fateOps);
++            log.debug("[{}] Adding FateExecutor for {} with {} threads", 
store.type(), fateOps,
++                poolSize);
 +            fateExecutors.add(
 +                new FateExecutor<>(Fate.this, environment, fateOps, poolSize, 
fateExecutorName));
 +          }
          }
 +      }
  
 -        while (true) {
 -          // Nothing is going to work well at this point, so why even try. 
Just wait for the end,
 -          // preventing this FATE thread from processing further work and 
likely failing.
 -          UtilWaitThread.sleepUninterruptibly(1, MINUTES);
 +      // resize task: see description for FateExecutor.resizeFateExecutor
 +      synchronized (fateExecutors) {
 +        for (var fateExecutor : fateExecutors) {
 +          if (fateExecutor.isShutdown()) {
 +            continue;
 +          }
 +          fateExecutor.resizeFateExecutor(poolConfigs, 
idleCheckIntervalMillis);
          }
        }
      }
@@@ -498,98 -467,24 +499,100 @@@
    }
  
    /**
 -   * Flags that FATE threadpool to clear out and end. Does not actively stop 
running FATE processes.
 +   * Lists transctions for a given fate key type.
     */
 -  public void shutdown(boolean wait) {
 -    log.info("Shutdown called on Fate, waiting: {}", wait);
 +  public Stream<FateKey> list(FateKey.FateKeyType type) {
 +    return store.list(type);
 +  }
 +
 +  /**
 +   * Initiates shutdown of background threads that run fate operations and 
cleanup fate data and
 +   * optionally waits on them. Leaves the fate object in a state where it can 
still update and read
 +   * fate data, like add a new fate operation or get the status of an 
existing fate operation.
 +   */
 +  public void shutdown(long timeout, TimeUnit timeUnit) {
-     log.info("Shutting down {} FATE", store.type());
- 
++    log.info("Shutting down {} FATE, waiting: {} seconds", store.type(),
++        TimeUnit.SECONDS.convert(timeout, timeUnit));
+     // important this is set before shutdownNow is called as the background
+     // threads will check this to see if shutdown related errors should be 
ignored.
 -    keepRunning.set(false);
 -    if (executor == null) {
 -      return;
 +    if (keepRunning.compareAndSet(true, false)) {
 +      synchronized (fateExecutors) {
 +        for (var fateExecutor : fateExecutors) {
 +          fateExecutor.initiateShutdown();
 +        }
 +      }
 +      if (deadResCleanerExecutor != null) {
 +        deadResCleanerExecutor.shutdown();
 +      }
 +      fatePoolsWatcherFuture.cancel(false);
      }
 -    executor.shutdownNow();
 -    if (wait) {
 -      while (!executor.isTerminated()) {
 -        try {
 -          executor.awaitTermination(1, SECONDS);
 -        } catch (InterruptedException e) {
 -          throw new IllegalStateException(e);
 +
 +    if (timeout > 0) {
 +      long start = System.nanoTime();
 +      try {
 +        waitForAllFateExecShutdown(start, timeout, timeUnit);
 +        waitForDeadResCleanerShutdown(start, timeout, timeUnit);
 +
 +        if (anyFateExecutorIsAlive() || deadResCleanerIsAlive()) {
 +          log.warn(
 +              "Waited for {}ms for all fate {} background threads to stop, 
but some are still running. "
 +                  + "fate executor threads:{} dead reservation cleaner 
thread:{} ",
 +              TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), 
store.type(),
 +              anyFateExecutorIsAlive(), deadResCleanerIsAlive());
          }
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    // interrupt the background threads
 +    synchronized (fateExecutors) {
 +      var fateExecutorsIter = fateExecutors.iterator();
 +      while (fateExecutorsIter.hasNext()) {
 +        var fateExecutor = fateExecutorsIter.next();
 +        fateExecutor.shutdownNow();
 +        fateExecutor.getIdleCountHistory().clear();
 +        fateExecutorsIter.remove();
 +      }
 +    }
 +    if (deadResCleanerExecutor != null) {
 +      deadResCleanerExecutor.shutdownNow();
 +    }
 +  }
 +
 +  /**
 +   * Initiates shutdown of all fate threads and prevents reads and updates of 
fates persisted data.
 +   */
 +  public void close() {
 +    shutdown(0, SECONDS);
 +    store.close();
 +  }
 +
 +  private boolean anyFateExecutorIsAlive() {
 +    synchronized (fateExecutors) {
 +      return fateExecutors.stream().anyMatch(FateExecutor::isAlive);
 +    }
 +  }
 +
 +  private boolean deadResCleanerIsAlive() {
 +    return deadResCleanerExecutor != null && 
!deadResCleanerExecutor.isTerminated();
 +  }
 +
 +  private void waitForAllFateExecShutdown(long start, long timeout, TimeUnit 
timeUnit)
 +      throws InterruptedException {
 +    synchronized (fateExecutors) {
 +      for (var fateExecutor : fateExecutors) {
 +        fateExecutor.waitForShutdown(start, timeout, timeUnit);
 +      }
 +    }
 +  }
 +
 +  private void waitForDeadResCleanerShutdown(long start, long timeout, 
TimeUnit timeUnit)
 +      throws InterruptedException {
 +    while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && 
deadResCleanerIsAlive()) {
 +      if (deadResCleanerExecutor != null && 
!deadResCleanerExecutor.awaitTermination(1, SECONDS)) {
 +        log.debug("Fate {} is waiting for dead reservation cleaner thread to 
terminate",
 +            store.type());
        }
      }
    }
diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 5797888043,0000000000..bbf0bcb81e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@@ -1,619 -1,0 +1,669 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
 +import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
 +
 +import java.time.Duration;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.LinkedTransferQueue;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TransferQueue;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.util.ShutdownUtil;
 +import org.apache.accumulo.core.util.Timer;
 +import org.apache.accumulo.core.util.threads.ThreadPoolNames;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * Handles finding and working on FATE work. Only finds/works on fate 
operations that it is assigned
 + * to work on defined by 'fateOps'. These executors may be stopped and new 
ones started throughout
 + * FATEs life, depending on changes to {@link 
Property#MANAGER_FATE_USER_CONFIG} and
 + * {@link Property#MANAGER_FATE_META_CONFIG}.
 + */
 +public class FateExecutor<T> {
 +  private static final Logger log = 
LoggerFactory.getLogger(FateExecutor.class);
 +  private final Logger runnerLog = 
LoggerFactory.getLogger(TransactionRunner.class);
 +
 +  private final T environment;
 +  private final Fate<T> fate;
 +  private final Thread workFinder;
 +  private final TransferQueue<FateId> workQueue;
 +  private final AtomicInteger idleWorkerCount;
 +  private final String name;
 +  private final String poolName;
 +  private final ThreadPoolExecutor transactionExecutor;
 +  private final Set<TransactionRunner> runningTxRunners;
 +  private final Set<Fate.FateOperation> fateOps;
 +  private final ConcurrentLinkedQueue<Integer> idleCountHistory = new 
ConcurrentLinkedQueue<>();
 +  private final FateExecutorMetrics<T> fateExecutorMetrics;
 +
 +  public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> 
fateOps, int poolSize,
 +      String name) {
 +    final FateInstanceType type = fate.getStore().type();
 +    final String typeStr = type.name().toLowerCase();
 +    final String poolName =
 +        ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." + 
name;
 +    final String workFinderThreadName = "fate.work.finder." + typeStr + "." + 
name;
 +
 +    this.fate = fate;
 +    this.environment = environment;
 +    this.fateOps = Collections.unmodifiableSet(fateOps);
 +    this.workQueue = new LinkedTransferQueue<>();
 +    this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
 +    this.name = name;
 +    this.poolName = poolName;
 +    this.transactionExecutor = 
ThreadPools.getServerThreadPools().getPoolBuilder(poolName)
 +        .numCoreThreads(poolSize).build();
 +    this.idleWorkerCount = new AtomicInteger(0);
 +    this.fateExecutorMetrics =
 +        new FateExecutorMetrics<T>(type, poolName, runningTxRunners, 
idleWorkerCount);
 +
 +    this.workFinder = Threads.createCriticalThread(workFinderThreadName, new 
WorkFinder());
 +    this.workFinder.start();
 +  }
 +
 +  /**
 +   * resize the pool to match the config as necessary and submit new 
TransactionRunners if the pool
 +   * grew, stop TransactionRunners if the pool shrunk, and potentially 
suggest resizing the pool if
 +   * the load is consistently high.
 +   */
 +  protected void resizeFateExecutor(
 +      Map<Set<Fate.FateOperation>,Map.Entry<String,Integer>> poolConfigs,
 +      long idleCheckIntervalMillis) {
 +    final int configured = poolConfigs.get(fateOps).getValue();
 +    ThreadPools.resizePool(transactionExecutor, () -> configured, poolName);
 +    synchronized (runningTxRunners) {
 +      final int running = runningTxRunners.size();
 +      final int needed = configured - running;
 +      log.trace("resizing pools configured:{} running:{} needed:{} 
fateOps:{}", configured, running,
 +          needed, fateOps);
 +      if (needed > 0) {
 +        // If the pool grew, then ensure that there is a TransactionRunner 
for each thread
 +        for (int i = 0; i < needed; i++) {
 +          final TransactionRunner tr = new TransactionRunner();
 +          try {
 +            runningTxRunners.add(tr);
 +            transactionExecutor.execute(tr);
 +          } catch (RejectedExecutionException e) {
 +            runningTxRunners.remove(tr);
 +            // RejectedExecutionException could be shutting down
 +            if (transactionExecutor.isShutdown()) {
 +              // The exception is expected in this case, no need to spam the 
logs.
 +              log.trace("Expected error adding transaction runner to FaTE 
executor pool. "
 +                  + "The pool is shutdown.", e);
 +            } else {
 +              // This is bad, FaTE may no longer work!
 +              log.error("Unexpected error adding transaction runner to FaTE 
executor pool.", e);
 +            }
 +            break;
 +          }
 +        }
 +        idleCountHistory.clear();
 +      } else if (needed < 0) {
 +        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely
 +        // stopped.
 +        // Flag the necessary number of TransactionRunners to safely stop 
when they are done
 +        // work on a transaction.
 +        int numFlagged = (int) runningTxRunners.stream()
 +            .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count();
 +        int numToStop = -1 * (numFlagged + needed);
 +        for (var runner : runningTxRunners) {
 +          if (numToStop <= 0) {
 +            break;
 +          }
 +          if (runner.flagStop()) {
 +            log.trace("Flagging a TransactionRunner to stop...");
 +            numToStop--;
 +          }
 +        }
 +      } else {
 +        // The pool size did not change, but should it based on idle Fate 
threads? Maintain
 +        // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then
 +        // suggest that the pool size be increased or the fate ops assigned 
to that pool be
 +        // split into separate pools.
 +        final long interval =
 +            Math.min(60, 
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
 +        var fateConfigProp = Fate.getFateConfigProp(fate.getStore().type());
 +
 +        if (interval == 0) {
 +          idleCountHistory.clear();
 +        } else {
 +          if (idleCountHistory.size() >= interval * 2) { // this task runs 
every 30s
 +            int zeroFateThreadsIdleCount = 0;
 +            for (Integer idleConsumerCount : idleCountHistory) {
 +              if (idleConsumerCount == 0) {
 +                zeroFateThreadsIdleCount++;
 +              }
 +            }
 +            boolean needMoreThreads =
 +                (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) 
>= 0.95;
 +            if (needMoreThreads) {
 +              fate.getNeedMoreThreadsWarnCount().incrementAndGet();
 +              log.warn(
 +                  "All {} Fate threads working on the fate ops {} appear to 
be busy for "
 +                      + "the last {} minutes. Consider increasing the value 
for the "
 +                      + "entry in the property {} or splitting the fate ops 
across "
 +                      + "multiple entries/pools.",
 +                  fate.getStore().type(), fateOps, interval, 
fateConfigProp.getKey());
 +              // Clear the history so that we don't log for interval minutes.
 +              idleCountHistory.clear();
 +            } else {
 +              while (idleCountHistory.size() >= interval * 2) {
 +                idleCountHistory.remove();
 +              }
 +            }
 +          }
 +          idleCountHistory.add(getIdleWorkerCount());
 +        }
 +      }
 +    }
 +  }
 +
 +  protected String getName() {
 +    return name;
 +  }
 +
 +  private int getIdleWorkerCount() {
 +    // This could call workQueue.getWaitingConsumerCount() if other code use 
poll with timeout
 +    return idleWorkerCount.get();
 +  }
 +
 +  /**
 +   * @return the number of currently running transaction runners
 +   */
 +  protected int getNumRunningTxRunners() {
 +    return runningTxRunners.size();
 +  }
 +
 +  protected Set<Fate.FateOperation> getFateOps() {
 +    return fateOps;
 +  }
 +
 +  public FateExecutorMetrics<T> getFateExecutorMetrics() {
 +    return fateExecutorMetrics;
 +  }
 +
 +  /**
 +   * Initiates the shutdown of this FateExecutor. This means the pool 
executing TransactionRunners
 +   * will no longer accept new TransactionRunners, the currently running 
TransactionRunners will
 +   * terminate after they are done with their current transaction, if 
applicable, the work finder is
 +   * shutdown, and the metrics created for this FateExecutor are removed from 
the registry (if
 +   * metrics were enabled). {@link #isShutdown()} returns true after this is 
called.
 +   */
 +  protected void initiateShutdown() {
 +    log.debug("Initiated shutdown {}", fateOps);
 +    transactionExecutor.shutdown();
 +    synchronized (runningTxRunners) {
 +      runningTxRunners.forEach(TransactionRunner::flagStop);
 +    }
 +    fateExecutorMetrics.clearMetrics();
 +    // work finder will terminate since this.isShutdown() is true
 +  }
 +
 +  /**
 +   * @return true if {@link #initiateShutdown()} has previously been called 
on this FateExecutor.
 +   *         The FateExecutor may or may not still have running threads. To 
check that, see
 +   *         {@link #isAlive()}
 +   */
 +  protected boolean isShutdown() {
 +    return transactionExecutor.isShutdown();
 +  }
 +
 +  protected void shutdownNow() {
 +    transactionExecutor.shutdownNow();
 +  }
 +
 +  protected void waitForShutdown(long start, long timeout, TimeUnit timeUnit)
 +      throws InterruptedException {
 +    if (timeout > 0) {
 +      while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && 
isAlive()) {
 +        if (!transactionExecutor.awaitTermination(1, SECONDS)) {
 +          log.debug("Fate {} is waiting for {} worker threads for fate ops {} 
to terminate",
 +              fate.getStore().type(), runningTxRunners.size(), fateOps);
 +          continue;
 +        }
 +
 +        workFinder.join(1_000);
 +        if (workFinder.isAlive()) {
 +          log.debug("Fate {} is waiting for work finder thread for fate ops 
{} to terminate",
 +              fate.getStore().type(), fateOps);
 +          workFinder.interrupt();
 +        }
 +      }
 +
 +      if (isAlive()) {
 +        log.warn(
 +            "Waited for {}ms for the {} fate executor operating on fate ops 
{} to stop, but it"
 +                + " is still running. Summary of run state of its threads: 
work finder:{}"
 +                + " transaction executor:{}",
 +            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), 
fate.getStore().type(),
 +            fateOps, workFinder.isAlive(), 
!transactionExecutor.isTerminated());
 +      }
 +    }
 +  }
 +
 +  /**
 +   * This fate executor is defined as being alive if any of its threads are 
running
 +   */
 +  protected boolean isAlive() {
 +    return !transactionExecutor.isTerminated() || workFinder.isAlive();
 +  }
 +
 +  protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() {
 +    return idleCountHistory;
 +  }
 +
 +  /**
 +   * A single thread that finds transactions to work on and queues them up. 
Do not want each worker
 +   * thread going to the store and looking for work as it would place more 
load on the store.
 +   */
 +  private class WorkFinder implements Runnable {
 +
 +    @Override
 +    public void run() {
 +      while (fate.getKeepRunning().get() && !isShutdown()) {
 +        try {
 +          fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> {
 +            // The FateId with the fate operation 'fateOp' is workable by 
this FateExecutor if
 +            // 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' 
is in 'fateOps')
 +            // 2) The transaction was cancelled while NEW. This is an edge 
case that needs to be
 +            // handled since this is the only case where we will have a 
runnable transaction
 +            // that doesn't have a name. We allow any FateExecutor to work on 
this since this case
 +            // should be rare and won't put much load on any one FateExecutor
 +            var status = fateIdStatus.getStatus();
 +            var fateOp = fateIdStatus.getFateOperation().orElse(null);
 +            if ((fateOp != null && fateOps.contains(fateOp))
 +                || txCancelledWhileNew(status, fateOp)) {
 +              while (fate.getKeepRunning().get() && !isShutdown()) {
 +                try {
 +                  // The reason for calling transfer instead of queueing is 
avoid rescanning the
 +                  // storage layer and adding the same thing over and over. 
For example if all
 +                  // threads were busy, the queue size was 100, and there are 
three runnable things
 +                  // in the store. Do not want to keep scanning the store 
adding those same 3
 +                  // runnable things until the queue is full.
 +                  if (workQueue.tryTransfer(fateIdStatus.getFateId(), 100, 
MILLISECONDS)) {
 +                    break;
 +                  }
 +                } catch (InterruptedException e) {
 +                  throw new IllegalStateException(e);
 +                }
 +              }
 +            }
 +          });
 +        } catch (Exception e) {
 +          log.warn("Unexpected failure while attempting to find work for 
fate", e);
 +          workQueue.clear();
 +        }
 +      }
 +
 +      log.debug(
 +          "FATE work finder for ops {} is gracefully exiting: either FATE is "
 +              + "being shutdown ({}) and therefore all FATE threads are being 
shutdown or the "
 +              + "FATE threads for the specific ops are being shutdown (due to 
FATE shutdown, "
 +              + "or due to FATE config changes) ({})",
 +          fateOps, !fate.getKeepRunning().get(), isShutdown());
 +    }
 +
 +    private boolean txCancelledWhileNew(TStatus status, Fate.FateOperation 
fateOp) {
 +      if (fateOp == null) {
 +        // The only time a transaction should be runnable and not have a fate 
operation is if
 +        // it was cancelled while NEW (and now is FAILED_IN_PROGRESS)
 +        Preconditions.checkState(status == FAILED_IN_PROGRESS);
 +        return true;
 +      }
 +      return false;
 +    }
 +  }
 +
 +  protected class TransactionRunner implements Runnable {
 +
 +    // used to signal a TransactionRunner to stop in the case where there are 
too many running
 +    // i.e.,
 +    // 1. the property for the pool size decreased so we have to stop excess 
TransactionRunners
 +    // or
 +    // 2. this FateExecutor is no longer valid from config changes so we need 
to shutdown this
 +    // FateExecutor
 +    private final AtomicBoolean stop = new AtomicBoolean(false);
 +    private volatile Long threadId = null;
 +
 +    private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
 +      idleWorkerCount.getAndIncrement();
 +      try {
 +        while (fate.getKeepRunning().get() && !stop.get()) {
 +          // Because of JDK-8301341 can not use poll w/ timeout until JDK 21+
 +          FateId unreservedFateId = workQueue.poll();
 +
 +          if (unreservedFateId == null) {
 +            Thread.sleep(1);
 +            continue;
 +          }
 +          var optionalopStore = fate.getStore().tryReserve(unreservedFateId);
 +          if (optionalopStore.isPresent()) {
 +            return optionalopStore;
 +          }
 +        }
 +      } finally {
 +        idleWorkerCount.decrementAndGet();
 +      }
 +
 +      return Optional.empty();
 +    }
 +
++    private boolean isInterruptedException(Throwable e) {
++      if (e == null) {
++        return false;
++      }
++
++      if (e instanceof InterruptedException) {
++        return true;
++      }
++
++      for (Throwable suppressed : e.getSuppressed()) {
++        if (isInterruptedException(suppressed)) {
++          return true;
++        }
++      }
++
++      return isInterruptedException(e.getCause());
++    }
++
 +    @Override
 +    public void run() {
 +      runnerLog.trace("A TransactionRunner is starting for {} {} ", 
fate.getStore().type(),
 +          fateOps);
 +      threadId = Thread.currentThread().getId();
 +      try {
 +        while (fate.getKeepRunning().get() && !isShutdown() && !stop.get()) {
 +          FateTxStore<T> txStore = null;
 +          ExecutionState state = new ExecutionState();
 +          try {
 +            var optionalopStore = reserveFateTx();
 +            if (optionalopStore.isPresent()) {
 +              txStore = optionalopStore.orElseThrow();
 +            } else {
 +              continue;
 +            }
 +            state.status = txStore.getStatus();
 +            state.op = txStore.top();
 +            runnerLog.trace("Processing FATE transaction {} id: {} status: 
{}",
 +                state.op == null ? null : state.op.getName(), 
txStore.getID(), state.status);
 +            if (state.status == FAILED_IN_PROGRESS) {
 +              processFailed(txStore, state.op);
 +            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
 +              try {
 +                execute(txStore, state);
++                // It's possible that a Fate operation impl
++                // may not do the right thing with an
++                // InterruptedException.
++                if (Thread.currentThread().isInterrupted()) {
++                  throw new InterruptedException("Fate Transaction Runner 
thread interrupted");
++                }
 +                if (state.op != null && state.deferTime != 0) {
 +                  // The current op is not ready to execute
 +                  continue;
 +                }
 +              } catch (StackOverflowException e) {
 +                // the op that failed to push onto the stack was never 
executed, so no need to undo
 +                // it just transition to failed and undo the ops that executed
 +                transitionToFailed(txStore, e);
 +                continue;
 +              } catch (Exception e) {
-                 blockIfHadoopShutdown(txStore.getID(), e);
-                 transitionToFailed(txStore, e);
-                 continue;
++                if (!isInterruptedException(e)) {
++                  blockIfHadoopShutdown(txStore.getID(), e);
++                  transitionToFailed(txStore, e);
++                  continue;
++                } else {
++                  if (fate.getKeepRunning().get()) {
++                    throw e;
++                  } else {
++                    // If we are shutting down then Fate.shutdown was called
++                    // and ExecutorService.shutdownNow was called resulting
++                    // in this exception. We will exit at the top of the loop.
++                    Thread.interrupted();
++                    continue;
++                  }
++                }
 +              }
 +
 +              if (state.op == null) {
 +                // transaction is finished
 +                String ret = state.prevOp.getReturn();
 +                if (ret != null) {
 +                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
 +                }
 +                txStore.setStatus(SUCCESSFUL);
 +                doCleanUp(txStore);
 +              }
 +            }
 +          } catch (Exception e) {
 +            String name = state.op == null ? null : state.op.getName();
 +            FateId txid = txStore == null ? null : txStore.getID();
-             runnerLog.error(
-                 "Uncaught exception in FATE runner thread processing {} id: 
{} status: {}", name,
-                 txid, state.status, e);
++            if (isInterruptedException(e)) {
++              if (fate.getKeepRunning().get()) {
++                runnerLog.error(
++                    "Uncaught InterruptedException in FATE runner thread 
processing {} id: {} status: {}",
++                    name, txid, state.status, e);
++              } else {
++                // If we are shutting down then Fate.shutdown was called
++                // and ExecutorService.shutdownNow was called resulting
++                // in this exception. We will exit at the top of the loop,
++                // so continue this loop iteration normally.
++                Thread.interrupted();
++              }
++            } else {
++              runnerLog.error(
++                  "Uncaught exception in FATE runner thread processing {} id: 
{} status: {}", name,
++                  txid, state.status, e);
++            }
 +          } finally {
 +            if (txStore != null) {
 +              if (runnerLog.isTraceEnabled()) {
 +                String name = state.op == null ? null : state.op.getName();
 +                runnerLog.trace("Completed FATE transaction {} id: {} status: 
{}", name,
 +                    txStore.getID(), state.status);
 +              }
 +              txStore.unreserve(Duration.ofMillis(state.deferTime));
 +            }
 +          }
 +        }
 +      } finally {
 +        log.trace("A TransactionRunner is exiting for {} {}", 
fate.getStore().type(), fateOps);
 +        Preconditions.checkState(runningTxRunners.remove(this));
 +        threadId = null;
 +      }
 +    }
 +
 +    private class ExecutionState {
 +      Repo<T> prevOp = null;
 +      Repo<T> op = null;
 +      long deferTime = 0;
 +      TStatus status;
 +    }
 +
 +    // Executes as many steps of a fate operation as possible
 +    private void execute(final FateTxStore<T> txStore, final ExecutionState 
state)
 +        throws Exception {
 +      while (state.op != null && state.deferTime == 0) {
 +        state.deferTime = executeIsReady(txStore.getID(), state.op);
 +
 +        if (state.deferTime == 0) {
 +          if (state.status == SUBMITTED) {
 +            txStore.setStatus(IN_PROGRESS);
 +            state.status = IN_PROGRESS;
 +          }
 +
 +          state.prevOp = state.op;
 +          state.op = executeCall(txStore.getID(), state.op);
 +
 +          if (state.op != null) {
 +            // persist the completion of this step before starting to run the 
next so in the case of
 +            // process death the completed steps are not rerun
 +            txStore.push(state.op);
 +          }
 +        }
 +      }
 +    }
 +
 +    /**
 +     * The Hadoop Filesystem registers a java shutdown hook that closes the 
file system. This can
 +     * cause threads to get spurious IOException. If this happens, instead of 
failing a FATE
 +     * transaction just wait for process to die. When the manager start 
elsewhere the FATE
 +     * transaction can resume.
 +     */
 +    private void blockIfHadoopShutdown(FateId fateId, Exception e) {
 +      if (ShutdownUtil.isShutdownInProgress()) {
 +
 +        if (e instanceof AcceptableException) {
 +          log.debug("Ignoring exception possibly caused by Hadoop Shutdown 
hook. {} ", fateId, e);
 +        } else if (isIOException(e)) {
 +          log.info("Ignoring exception likely caused by Hadoop Shutdown hook. 
{} ", fateId, e);
 +        } else {
 +          // sometimes code will catch an IOException caused by the hadoop 
shutdown hook and throw
 +          // another exception without setting the cause.
 +          log.warn("Ignoring exception possibly caused by Hadoop Shutdown 
hook. {} ", fateId, e);
 +        }
 +
 +        while (true) {
 +          // Nothing is going to work well at this point, so why even try. 
Just wait for the end,
 +          // preventing this FATE thread from processing further work and 
likely failing.
 +          sleepUninterruptibly(1, MINUTES);
 +        }
 +      }
 +    }
 +
 +    private void transitionToFailed(FateTxStore<T> txStore, Exception e) {
 +      final String msg = "Failed to execute Repo " + txStore.getID();
 +      // Certain FATE ops that throw exceptions don't need to be propagated 
up to the Monitor
 +      // as a warning. They're a normal, handled failure condition.
 +      if (e instanceof AcceptableException) {
 +        var tableOpEx = (AcceptableThriftTableOperationException) e;
 +        log.info("{} for table:{}({}) saw acceptable exception: {}", msg, 
tableOpEx.getTableName(),
 +            tableOpEx.getTableId(), tableOpEx.getDescription());
 +      } else {
 +        log.warn(msg, e);
 +      }
 +      txStore.setTransactionInfo(TxInfo.EXCEPTION, e);
 +      txStore.setStatus(FAILED_IN_PROGRESS);
 +      log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", 
txStore.getID());
 +    }
 +
 +    private void processFailed(FateTxStore<T> txStore, Repo<T> op) {
 +      while (op != null) {
 +        undo(txStore.getID(), op);
 +
 +        txStore.pop();
 +        op = txStore.top();
 +      }
 +
 +      txStore.setStatus(FAILED);
 +      doCleanUp(txStore);
 +    }
 +
 +    private void doCleanUp(FateTxStore<T> txStore) {
 +      Boolean autoClean = (Boolean) 
txStore.getTransactionInfo(TxInfo.AUTO_CLEAN);
 +      if (autoClean != null && autoClean) {
 +        txStore.delete();
 +      } else {
 +        // no longer need persisted operations, so delete them to save space 
in case
 +        // TX is never cleaned up...
 +        while (txStore.top() != null) {
 +          txStore.pop();
 +        }
 +      }
 +    }
 +
 +    private void undo(FateId fateId, Repo<T> op) {
 +      try {
 +        op.undo(fateId, environment);
 +      } catch (Exception e) {
 +        log.warn("Failed to undo Repo, " + fateId, e);
 +      }
 +    }
 +
 +    protected boolean flagStop() {
 +      boolean setStop = stop.compareAndSet(false, true);
 +      if (setStop) {
 +        runnerLog.trace("set stop for {}", threadId);
 +      }
 +      return setStop;
 +    }
 +
 +    protected boolean isFlaggedToStop() {
 +      return stop.get();
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return "threadId:" + threadId + " stop:" + stop.get();
 +    }
 +
 +  }
 +
 +  protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
 +    var startTime = Timer.startNew();
 +    var deferTime = op.isReady(fateId, environment);
 +    log.debug("Running {}.isReady() {} took {} ms and returned {}", 
op.getName(), fateId,
 +        startTime.elapsed(MILLISECONDS), deferTime);
 +    return deferTime;
 +  }
 +
 +  protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception {
 +    var startTime = Timer.startNew();
 +    var next = op.call(fateId, environment);
 +    log.debug("Running {}.call() {} took {} ms and returned {}", 
op.getName(), fateId,
 +        startTime.elapsed(MILLISECONDS), next == null ? "null" : 
next.getName());
 +
 +    return next;
 +  }
 +
 +  @Override
 +  public String toString() {
 +    return 
String.format("FateExecutor:{FateOps=%s,Name=%s,PoolSize:%s,TransactionRunners:%s}",
 +        fateOps, name, runningTxRunners.size(), runningTxRunners);
 +  }
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
index d965881c17,0000000000..c526cd6f75
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
@@@ -1,570 -1,0 +1,642 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
 +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP;
 +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
++import static org.junit.jupiter.api.Assertions.assertNull;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.junit.jupiter.api.Assertions.fail;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
++import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
++import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.fate.AbstractFateStore;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.fate.Repo;
++import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
 +import org.apache.accumulo.test.util.Wait;
 +import org.junit.jupiter.api.Test;
 +import org.junit.jupiter.api.Timeout;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public abstract class FateITBase extends SharedMiniClusterBase implements 
FateTestRunner<TestEnv> {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(FateITBase.class);
 +
 +  private static CountDownLatch callStarted;
 +  private static CountDownLatch finishCall;
 +  private static CountDownLatch undoLatch;
++  private static AtomicReference<Throwable> interruptedException = new 
AtomicReference<>();
 +
 +  private enum ExceptionLocation {
 +    CALL, IS_READY
 +  }
 +
 +  public static class TestRepo implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    private final String data;
 +
 +    public TestRepo() {
 +      this("test");
 +    }
 +
 +    public TestRepo(String data) {
 +      this.data = data;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) throws Exception {
 +      return 0;
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return "TestRepo_" + data;
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      LOG.debug("Entering call {}", fateId);
 +      try {
 +        FateITBase.inCall();
 +        return null;
 +      } finally {
 +        LOG.debug("Leaving call {}", fateId);
 +      }
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) throws Exception {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return data + "_ret";
 +    }
 +  }
 +
 +  public static class TestOperationFails implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +    private static final Logger LOG = 
LoggerFactory.getLogger(TestOperationFails.class);
 +    private static List<String> undoOrder = new ArrayList<>();
 +    private static final int TOTAL_NUM_OPS = 3;
 +    private int opNum;
 +    private final String opName;
 +    private final ExceptionLocation location;
 +
 +    public TestOperationFails(int opNum, ExceptionLocation location) {
 +      this.opNum = opNum;
 +      this.opName = "OP" + opNum;
 +      this.location = location;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) throws Exception {
 +      LOG.debug("{} {} Entered isReady()", opName, fateId);
 +      if (location == ExceptionLocation.IS_READY) {
 +        if (opNum < TOTAL_NUM_OPS) {
 +          return 0;
 +        } else {
 +          throw new Exception(opName + " " + fateId + " isReady() failed - 
this is expected");
 +        }
 +      } else {
 +        return 0;
 +      }
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return getClass().getName();
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) throws Exception {
 +      LOG.debug("{} {} Entered undo()", opName, fateId);
 +      undoOrder.add(opName);
 +      undoLatch.countDown();
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      LOG.debug("{} {} Entered call()", opName, fateId);
 +      if (location == ExceptionLocation.CALL) {
 +        if (opNum < TOTAL_NUM_OPS) {
 +          return new TestOperationFails(++opNum, location);
 +        } else {
 +          throw new Exception(opName + " " + fateId + " call() failed - this 
is expected");
 +        }
 +      } else {
 +        return new TestOperationFails(++opNum, location);
 +      }
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return "none";
 +    }
 +  }
 +
 +  /**
 +   * Test Repo that allows configuring a delay time to be returned in 
isReady().
 +   */
 +  public static class DeferredTestRepo implements Repo<TestEnv> {
 +    private static final long serialVersionUID = 1L;
 +
 +    private final String data;
 +
 +    // These are static as we don't want to serialize them and they should
 +    // be shared across all instances during the test
 +    private static final AtomicInteger executedCalls = new AtomicInteger();
 +    private static final AtomicLong delay = new AtomicLong();
 +    private static final CountDownLatch callLatch = new CountDownLatch(1);
 +
 +    public DeferredTestRepo(String data) {
 +      this.data = data;
 +    }
 +
 +    @Override
 +    public long isReady(FateId fateId, TestEnv environment) {
 +      LOG.debug("{} delayed {}", fateId, delay.get());
 +      return delay.get();
 +    }
 +
 +    @Override
 +    public String getName() {
 +      return "TestRepo_" + data;
 +    }
 +
 +    @Override
 +    public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws 
Exception {
 +      callLatch.await();
 +      LOG.debug("Executing call {}, total executed {}", fateId, 
executedCalls.incrementAndGet());
 +      return null;
 +    }
 +
 +    @Override
 +    public void undo(FateId fateId, TestEnv environment) {
 +
 +    }
 +
 +    @Override
 +    public String getReturn() {
 +      return data + "_ret";
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testTransactionStatus() throws Exception {
 +    executeTest(this::testTransactionStatus);
 +  }
 +
 +  protected void testTransactionStatus(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testTransactionStatus"), true,
 +          "Test Op");
 +      assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
 +      // wait for call() to be called
 +      callStarted.await();
 +      assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId));
 +      // tell the op to exit the method
 +      finishCall.countDown();
 +      // Check that it transitions to SUCCESSFUL and then removed (UNKNOWN)
 +      final var sawSuccess = new AtomicBoolean(false);
 +      Wait.waitFor(() -> {
 +        TStatus s;
 +        switch (s = getTxStatus(sctx, fateId)) {
 +          case IN_PROGRESS:
 +            if (sawSuccess.get()) {
 +              fail("Should never see IN_PROGRESS after seeing SUCCESSFUL");
 +            }
 +            break;
 +          case SUCCESSFUL:
 +            // expected, but might be too quick to be detected
 +            if (sawSuccess.compareAndSet(false, true)) {
 +              LOG.debug("Saw expected transaction status change to 
SUCCESSFUL");
 +            }
 +            break;
 +          case UNKNOWN:
 +            if (!sawSuccess.get()) {
 +              LOG.debug("Never saw transaction status change to SUCCESSFUL, 
but that's okay");
 +            }
 +            return true;
 +          default:
 +            fail("Saw unexpected status: " + s);
 +        }
 +        // keep waiting for UNKNOWN
 +        return false;
 +      }, SECONDS.toMillis(30), 10);
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileNew() throws Exception {
 +    executeTest(this::testCancelWhileNew);
 +  }
 +
 +  protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext 
sctx) throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileNew with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      // cancel the transaction
 +      assertTrue(fate.cancel(fateId));
 +      assertTrue(
 +          FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == 
getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileNew"), true,
 +          "Test Op");
 +      Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId));
 +      // nothing should have run
 +      assertEquals(1, callStarted.getCount());
 +      fate.delete(fateId);
 +      assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileSubmittedAndRunning() throws Exception {
 +    executeTest(this::testCancelWhileSubmittedAndRunning);
 +  }
 +
 +  protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileSubmittedAndRunning"),
 +          false, "Test Op");
 +      Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId));
 +      // This is false because the transaction runner has reserved the FaTe
 +      // transaction.
 +      assertFalse(fate.cancel(fateId));
 +      callStarted.await();
 +      finishCall.countDown();
 +      Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId));
 +      fate.delete(fateId);
 +      assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  public void testCancelWhileInCall() throws Exception {
 +    executeTest(this::testCancelWhileInCall);
 +  }
 +
 +  protected void testCancelWhileInCall(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      callStarted = new CountDownLatch(1);
 +      finishCall = new CountDownLatch(1);
 +
 +      FateId fateId = fate.startTransaction();
 +      LOG.debug("Starting test testCancelWhileInCall with {}", fateId);
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new 
TestRepo("testCancelWhileInCall"), true,
 +          "Test Op");
 +      assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
 +      // wait for call() to be called
 +      callStarted.await();
 +      // cancel the transaction
 +      assertFalse(fate.cancel(fateId));
 +      finishCall.countDown();
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testDeferredOverflow() throws Exception {
 +    // Set a maximum deferred map size of 10 transactions so that when the 
11th
 +    // is seen the Fate store should clear the deferred map and mark
 +    // the flag as overflow so that all the deferred transactions will be run
 +    executeTest(this::testDeferredOverflow, 10, 
AbstractFateStore.DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext 
sctx)
 +      throws Exception {
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      DeferredTestRepo.executedCalls.set(0);
 +      // Initialize the repo to have a delay of 30 seconds
 +      // so it will be deferred when submitted
 +      DeferredTestRepo.delay.set(30000);
 +
 +      Set<FateId> transactions = new HashSet<>();
 +
 +      // Start by creating 10 transactions that are all deferred which should
 +      // fill up the deferred map with all 10 as we set the max deferred limit
 +      // to only allow 10 transactions
 +      for (int i = 0; i < 10; i++) {
 +        submitDeferred(fate, sctx, transactions);
 +      }
 +
 +      // Verify all 10 are deferred in the map and each will
 +      // We should not be in an overflow state yet
 +      Wait.waitFor(() -> store.getDeferredCount() == 10);
 +      assertFalse(store.isDeferredOverflow());
 +
 +      // After verifying all 10 are deferred, submit another 10
 +      // which should trigger an overflow. We are blocking in the
 +      // call method of DeferredTestRepo at this point using a countdown
 +      // latch to prevent fate executor from running early and clearing
 +      // the deferred overflow flag before we can check it below
 +      for (int i = 0; i < 10; i++) {
 +        submitDeferred(fate, sctx, transactions);
 +      }
 +      // Verify deferred overflow is true and map is now empty
 +      Wait.waitFor(() -> store.getDeferredCount() == 0);
 +      Wait.waitFor(store::isDeferredOverflow);
 +
 +      // Set the delay to 0 and countdown so we will process the
 +      // call method in the repos. We need to change the delay because
 +      // due to the async nature of Fate it's possible some of the submitted
 +      // repos previously wouldn't be processed in the first batch until
 +      // after the flag was cleared which would trigger a long delay again
 +      DeferredTestRepo.delay.set(0);
 +      DeferredTestRepo.callLatch.countDown();
 +
 +      // Verify the flag was cleared and everything ran
 +      Wait.waitFor(() -> !store.isDeferredOverflow());
 +      Wait.waitFor(() -> DeferredTestRepo.executedCalls.get() == 20);
 +
 +      // Verify all 20 unique transactions finished
 +      Wait.waitFor(() -> {
 +        transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN);
 +        return transactions.isEmpty();
 +      });
 +
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testRepoFails() throws Exception {
 +    // Set a maximum deferred map size of 10 transactions so that when the 
11th
 +    // is seen the Fate store should clear the deferred map and mark
 +    // the flag as overflow so that all the deferred transactions will be run
 +    executeTest(this::testRepoFails, 10, 
AbstractFateStore.DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  protected void testRepoFails(FateStore<TestEnv> store, ServerContext sctx) 
throws Exception {
 +    /*
 +     * This test ensures that when an exception occurs in a Repo's call() or 
isReady() methods, that
 +     * undo() will be called back up the chain of Repo's and in the correct 
order. The test works as
 +     * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and 
returns Repo3, 3) Repo3
 +     * is called and throws an exception (in call() or isReady()). It is then 
expected that: 1)
 +     * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is 
called on Repo1
 +     */
 +    Fate<TestEnv> fate = initializeFate(store);
 +    try {
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2);
 +
 +      List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1");
 +      /*
 +       * Test exception in call()
 +       */
 +      TestOperationFails.undoOrder = new ArrayList<>();
 +      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
 +      FateId fateId = fate.startTransaction();
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, 
ExceptionLocation.CALL),
 +          false, "Test Op Fails");
 +      // Wait for all the undo() calls to complete
 +      undoLatch.await();
 +      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
 +      assertEquals(FAILED, fate.waitForCompletion(fateId));
 +      assertTrue(fate.getException(fateId).getMessage().contains("call() 
failed"));
 +      /*
 +       * Test exception in isReady()
 +       */
 +      TestOperationFails.undoOrder = new ArrayList<>();
 +      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
 +      fateId = fate.startTransaction();
 +      assertEquals(NEW, getTxStatus(sctx, fateId));
 +      fate.seedTransaction(TEST_FATE_OP, fateId,
 +          new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test 
Op Fails");
 +      // Wait for all the undo() calls to complete
 +      undoLatch.await();
 +      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
 +      assertEquals(FAILED, fate.waitForCompletion(fateId));
 +      assertTrue(fate.getException(fateId).getMessage().contains("isReady() 
failed"));
 +    } finally {
 +      fate.shutdown(10, TimeUnit.MINUTES);
 +    }
 +  }
 +
 +  @Test
 +  @Timeout(30)
 +  public void testNoWriteAfterDelete() throws Exception {
 +    executeTest(this::testNoWriteAfterDelete);
 +  }
 +
 +  protected void testNoWriteAfterDelete(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    final FateId fateId = store.create();
 +    final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete");
 +
 +    var txStore = store.reserve(fateId);
 +
 +    // all write ops should be ok after reservation
 +    assertDoesNotThrow(() -> txStore.push(repo));
 +    assertDoesNotThrow(() -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
 +    assertDoesNotThrow(txStore::pop);
 +    assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, 
TEST_FATE_OP));
 +    assertDoesNotThrow(txStore::delete);
 +
 +    // test that all write ops result in an exception since the tx has been 
deleted
 +    assertThrows(Exception.class, () -> txStore.push(repo));
 +    assertThrows(Exception.class, () -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
 +    assertThrows(Exception.class, txStore::pop);
 +    assertThrows(Exception.class,
 +        () -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP));
 +    assertThrows(Exception.class, txStore::delete);
 +  }
 +
 +  private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, 
Set<FateId> transactions) {
 +    FateId fateId = fate.startTransaction();
 +    transactions.add(fateId);
 +    assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
 +    fate.seedTransaction(TEST_FATE_OP, fateId, new 
DeferredTestRepo("testDeferredOverflow"), true,
 +        "Test Op");
 +    assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
 +  }
 +
 +  protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
 +    return new Fate<>(new TestEnv(), store, false, r -> r + "",
 +        FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, 
"AllFateOps"),
 +        new ScheduledThreadPoolExecutor(2));
 +  }
 +
 +  protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId);
 +
++  @Test
++  @Timeout(60)
++  public void testShutdownDoesNotFailTx() throws Exception {
++    executeTest(this::testShutdownDoesNotFailTx);
++  }
++
++  protected void testShutdownDoesNotFailTx(FateStore<TestEnv> store, 
ServerContext sctx)
++      throws Exception {
++    ConfigurationCopy config = new ConfigurationCopy();
++    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
++
++    Fate<TestEnv> fate = initializeFate(store);
++
++    // Wait for the transaction runner to be scheduled.
++    UtilWaitThread.sleep(3000);
++
++    callStarted = new CountDownLatch(1);
++    finishCall = new CountDownLatch(1);
++
++    FateId txid = fate.startTransaction();
++    assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
++
++    fate.seedTransaction(TEST_FATE_OP, txid, new 
TestRepo("testShutdownDoesNotFailTx"), true,
++        "Test Op");
++
++    assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid));
++
++    // wait for call() to be called
++    callStarted.await();
++    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
++
++    // shutdown fate
++    fate.shutdown(0, SECONDS);
++
++    // tell the op to exit the method
++    Wait.waitFor(() -> interruptedException.get() != null);
++    interruptedException.set(null);
++
++    // restart fate
++    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
++    fate = initializeFate(store);
++    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
++
++    // Restarting the transaction runners will retry the in-progress
++    // transaction. Reset the CountDownLatch's to confirm.
++    callStarted = new CountDownLatch(1);
++    finishCall = new CountDownLatch(1);
++
++    callStarted.await();
++    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
++    finishCall.countDown();
++
++    // This should complete normally, cleaning up the tx and deleting it from 
ZK
++    TStatus status = getTxStatus(sctx, txid);
++    while (status != TStatus.UNKNOWN) {
++      Thread.sleep(100);
++      status = getTxStatus(sctx, txid);
++    }
++    assertNull(interruptedException.get());
++  }
++
 +  private static void inCall() throws InterruptedException {
 +    // signal that call started
 +    callStarted.countDown();
-     // wait for the signal to exit the method
-     finishCall.await();
++    try {
++      // wait for the signal to exit the method
++      finishCall.await();
++    } catch (InterruptedException e) {
++      LOG.debug("InterruptedException occurred inCall.");
++      interruptedException.set(e);
++      throw e;
++    }
 +  }
 +}

Reply via email to