This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit e9de98b240ed724e38d9630fc0577bc044e4e581 Merge: 486cddb356 552c7a15b6 Author: Dave Marion <[email protected]> AuthorDate: Thu Dec 18 22:06:04 2025 +0000 Merge branch '2.1' .../java/org/apache/accumulo/core/fate/Fate.java | 9 ++- .../apache/accumulo/core/fate/FateExecutor.java | 62 ++++++++++++++++-- .../org/apache/accumulo/test/fate/FateITBase.java | 76 +++++++++++++++++++++- 3 files changed, 136 insertions(+), 11 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index e928cbd154,684fd69cc0..651e1ca838 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -204,32 -178,38 +204,33 @@@ public class Fate<T> } } } - } - /** - * The Hadoop Filesystem registers a java shutdown hook that closes the file system. This can - * cause threads to get spurious IOException. If this happens, instead of failing a FATE - * transaction just wait for process to die. When the manager start elsewhere the FATE - * transaction can resume. - */ - private void blockIfHadoopShutdown(long tid, Exception e) { - if (ShutdownUtil.isShutdownInProgress()) { - String tidStr = FateTxId.formatTid(tid); - - if (e instanceof AcceptableException) { - log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); - } else if (isIOException(e)) { - log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", tidStr, e); - } else { - // sometimes code will catch an IOException caused by the hadoop shutdown hook and throw - // another exception without setting the cause. - log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); + // replacement task: at this point, the existing FateExecutors that were invalidated by the + // config changes have started shutdown or finished shutdown. Now create any new replacement + // FateExecutors needed + for (var poolConfig : poolConfigs.entrySet()) { + Set<FateOperation> fateOps = poolConfig.getKey(); + Map.Entry<String,Integer> fateExecNameAndPoolSize = poolConfig.getValue(); + String fateExecutorName = fateExecNameAndPoolSize.getKey(); + int poolSize = fateExecNameAndPoolSize.getValue(); + synchronized (fateExecutors) { + if (fateExecutors.stream().noneMatch( + fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) { - log.debug("[{}] Adding FateExecutor for {}", store.type(), fateOps); ++ log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps, ++ poolSize); + fateExecutors.add( + new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName)); + } } + } - while (true) { - // Nothing is going to work well at this point, so why even try. Just wait for the end, - // preventing this FATE thread from processing further work and likely failing. - UtilWaitThread.sleepUninterruptibly(1, MINUTES); + // resize task: see description for FateExecutor.resizeFateExecutor + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + if (fateExecutor.isShutdown()) { + continue; + } + fateExecutor.resizeFateExecutor(poolConfigs, idleCheckIntervalMillis); } } } @@@ -498,98 -467,24 +499,100 @@@ } /** - * Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes. + * Lists transctions for a given fate key type. */ - public void shutdown(boolean wait) { - log.info("Shutdown called on Fate, waiting: {}", wait); + public Stream<FateKey> list(FateKey.FateKeyType type) { + return store.list(type); + } + + /** + * Initiates shutdown of background threads that run fate operations and cleanup fate data and + * optionally waits on them. Leaves the fate object in a state where it can still update and read + * fate data, like add a new fate operation or get the status of an existing fate operation. + */ + public void shutdown(long timeout, TimeUnit timeUnit) { - log.info("Shutting down {} FATE", store.type()); - ++ log.info("Shutting down {} FATE, waiting: {} seconds", store.type(), ++ TimeUnit.SECONDS.convert(timeout, timeUnit)); + // important this is set before shutdownNow is called as the background + // threads will check this to see if shutdown related errors should be ignored. - keepRunning.set(false); - if (executor == null) { - return; + if (keepRunning.compareAndSet(true, false)) { + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + fateExecutor.initiateShutdown(); + } + } + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdown(); + } + fatePoolsWatcherFuture.cancel(false); } - executor.shutdownNow(); - if (wait) { - while (!executor.isTerminated()) { - try { - executor.awaitTermination(1, SECONDS); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + + if (timeout > 0) { + long start = System.nanoTime(); + try { + waitForAllFateExecShutdown(start, timeout, timeUnit); + waitForDeadResCleanerShutdown(start, timeout, timeUnit); + + if (anyFateExecutorIsAlive() || deadResCleanerIsAlive()) { + log.warn( + "Waited for {}ms for all fate {} background threads to stop, but some are still running. " + + "fate executor threads:{} dead reservation cleaner thread:{} ", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(), + anyFateExecutorIsAlive(), deadResCleanerIsAlive()); } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // interrupt the background threads + synchronized (fateExecutors) { + var fateExecutorsIter = fateExecutors.iterator(); + while (fateExecutorsIter.hasNext()) { + var fateExecutor = fateExecutorsIter.next(); + fateExecutor.shutdownNow(); + fateExecutor.getIdleCountHistory().clear(); + fateExecutorsIter.remove(); + } + } + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdownNow(); + } + } + + /** + * Initiates shutdown of all fate threads and prevents reads and updates of fates persisted data. + */ + public void close() { + shutdown(0, SECONDS); + store.close(); + } + + private boolean anyFateExecutorIsAlive() { + synchronized (fateExecutors) { + return fateExecutors.stream().anyMatch(FateExecutor::isAlive); + } + } + + private boolean deadResCleanerIsAlive() { + return deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated(); + } + + private void waitForAllFateExecShutdown(long start, long timeout, TimeUnit timeUnit) + throws InterruptedException { + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + fateExecutor.waitForShutdown(start, timeout, timeUnit); + } + } + } + + private void waitForDeadResCleanerShutdown(long start, long timeout, TimeUnit timeUnit) + throws InterruptedException { + while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && deadResCleanerIsAlive()) { + if (deadResCleanerExecutor != null && !deadResCleanerExecutor.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate", + store.type()); } } } diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java index 5797888043,0000000000..bbf0bcb81e mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java @@@ -1,619 -1,0 +1,669 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TransferQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.ShutdownUtil; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.ThreadPoolNames; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Handles finding and working on FATE work. Only finds/works on fate operations that it is assigned + * to work on defined by 'fateOps'. These executors may be stopped and new ones started throughout + * FATEs life, depending on changes to {@link Property#MANAGER_FATE_USER_CONFIG} and + * {@link Property#MANAGER_FATE_META_CONFIG}. + */ +public class FateExecutor<T> { + private static final Logger log = LoggerFactory.getLogger(FateExecutor.class); + private final Logger runnerLog = LoggerFactory.getLogger(TransactionRunner.class); + + private final T environment; + private final Fate<T> fate; + private final Thread workFinder; + private final TransferQueue<FateId> workQueue; + private final AtomicInteger idleWorkerCount; + private final String name; + private final String poolName; + private final ThreadPoolExecutor transactionExecutor; + private final Set<TransactionRunner> runningTxRunners; + private final Set<Fate.FateOperation> fateOps; + private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>(); + private final FateExecutorMetrics<T> fateExecutorMetrics; + + public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps, int poolSize, + String name) { + final FateInstanceType type = fate.getStore().type(); + final String typeStr = type.name().toLowerCase(); + final String poolName = + ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." + name; + final String workFinderThreadName = "fate.work.finder." + typeStr + "." + name; + + this.fate = fate; + this.environment = environment; + this.fateOps = Collections.unmodifiableSet(fateOps); + this.workQueue = new LinkedTransferQueue<>(); + this.runningTxRunners = Collections.synchronizedSet(new HashSet<>()); + this.name = name; + this.poolName = poolName; + this.transactionExecutor = ThreadPools.getServerThreadPools().getPoolBuilder(poolName) + .numCoreThreads(poolSize).build(); + this.idleWorkerCount = new AtomicInteger(0); + this.fateExecutorMetrics = + new FateExecutorMetrics<T>(type, poolName, runningTxRunners, idleWorkerCount); + + this.workFinder = Threads.createCriticalThread(workFinderThreadName, new WorkFinder()); + this.workFinder.start(); + } + + /** + * resize the pool to match the config as necessary and submit new TransactionRunners if the pool + * grew, stop TransactionRunners if the pool shrunk, and potentially suggest resizing the pool if + * the load is consistently high. + */ + protected void resizeFateExecutor( + Map<Set<Fate.FateOperation>,Map.Entry<String,Integer>> poolConfigs, + long idleCheckIntervalMillis) { + final int configured = poolConfigs.get(fateOps).getValue(); + ThreadPools.resizePool(transactionExecutor, () -> configured, poolName); + synchronized (runningTxRunners) { + final int running = runningTxRunners.size(); + final int needed = configured - running; + log.trace("resizing pools configured:{} running:{} needed:{} fateOps:{}", configured, running, + needed, fateOps); + if (needed > 0) { + // If the pool grew, then ensure that there is a TransactionRunner for each thread + for (int i = 0; i < needed; i++) { + final TransactionRunner tr = new TransactionRunner(); + try { + runningTxRunners.add(tr); + transactionExecutor.execute(tr); + } catch (RejectedExecutionException e) { + runningTxRunners.remove(tr); + // RejectedExecutionException could be shutting down + if (transactionExecutor.isShutdown()) { + // The exception is expected in this case, no need to spam the logs. + log.trace("Expected error adding transaction runner to FaTE executor pool. " + + "The pool is shutdown.", e); + } else { + // This is bad, FaTE may no longer work! + log.error("Unexpected error adding transaction runner to FaTE executor pool.", e); + } + break; + } + } + idleCountHistory.clear(); + } else if (needed < 0) { + // If we need the pool to shrink, then ensure excess TransactionRunners are safely + // stopped. + // Flag the necessary number of TransactionRunners to safely stop when they are done + // work on a transaction. + int numFlagged = (int) runningTxRunners.stream() + .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count(); + int numToStop = -1 * (numFlagged + needed); + for (var runner : runningTxRunners) { + if (numToStop <= 0) { + break; + } + if (runner.flagStop()) { + log.trace("Flagging a TransactionRunner to stop..."); + numToStop--; + } + } + } else { + // The pool size did not change, but should it based on idle Fate threads? Maintain + // count of the last X minutes of idle Fate threads. If zero 95% of the time, then + // suggest that the pool size be increased or the fate ops assigned to that pool be + // split into separate pools. + final long interval = + Math.min(60, TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis)); + var fateConfigProp = Fate.getFateConfigProp(fate.getStore().type()); + + if (interval == 0) { + idleCountHistory.clear(); + } else { + if (idleCountHistory.size() >= interval * 2) { // this task runs every 30s + int zeroFateThreadsIdleCount = 0; + for (Integer idleConsumerCount : idleCountHistory) { + if (idleConsumerCount == 0) { + zeroFateThreadsIdleCount++; + } + } + boolean needMoreThreads = + (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >= 0.95; + if (needMoreThreads) { + fate.getNeedMoreThreadsWarnCount().incrementAndGet(); + log.warn( + "All {} Fate threads working on the fate ops {} appear to be busy for " + + "the last {} minutes. Consider increasing the value for the " + + "entry in the property {} or splitting the fate ops across " + + "multiple entries/pools.", + fate.getStore().type(), fateOps, interval, fateConfigProp.getKey()); + // Clear the history so that we don't log for interval minutes. + idleCountHistory.clear(); + } else { + while (idleCountHistory.size() >= interval * 2) { + idleCountHistory.remove(); + } + } + } + idleCountHistory.add(getIdleWorkerCount()); + } + } + } + } + + protected String getName() { + return name; + } + + private int getIdleWorkerCount() { + // This could call workQueue.getWaitingConsumerCount() if other code use poll with timeout + return idleWorkerCount.get(); + } + + /** + * @return the number of currently running transaction runners + */ + protected int getNumRunningTxRunners() { + return runningTxRunners.size(); + } + + protected Set<Fate.FateOperation> getFateOps() { + return fateOps; + } + + public FateExecutorMetrics<T> getFateExecutorMetrics() { + return fateExecutorMetrics; + } + + /** + * Initiates the shutdown of this FateExecutor. This means the pool executing TransactionRunners + * will no longer accept new TransactionRunners, the currently running TransactionRunners will + * terminate after they are done with their current transaction, if applicable, the work finder is + * shutdown, and the metrics created for this FateExecutor are removed from the registry (if + * metrics were enabled). {@link #isShutdown()} returns true after this is called. + */ + protected void initiateShutdown() { + log.debug("Initiated shutdown {}", fateOps); + transactionExecutor.shutdown(); + synchronized (runningTxRunners) { + runningTxRunners.forEach(TransactionRunner::flagStop); + } + fateExecutorMetrics.clearMetrics(); + // work finder will terminate since this.isShutdown() is true + } + + /** + * @return true if {@link #initiateShutdown()} has previously been called on this FateExecutor. + * The FateExecutor may or may not still have running threads. To check that, see + * {@link #isAlive()} + */ + protected boolean isShutdown() { + return transactionExecutor.isShutdown(); + } + + protected void shutdownNow() { + transactionExecutor.shutdownNow(); + } + + protected void waitForShutdown(long start, long timeout, TimeUnit timeUnit) + throws InterruptedException { + if (timeout > 0) { + while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && isAlive()) { + if (!transactionExecutor.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for {} worker threads for fate ops {} to terminate", + fate.getStore().type(), runningTxRunners.size(), fateOps); + continue; + } + + workFinder.join(1_000); + if (workFinder.isAlive()) { + log.debug("Fate {} is waiting for work finder thread for fate ops {} to terminate", + fate.getStore().type(), fateOps); + workFinder.interrupt(); + } + } + + if (isAlive()) { + log.warn( + "Waited for {}ms for the {} fate executor operating on fate ops {} to stop, but it" + + " is still running. Summary of run state of its threads: work finder:{}" + + " transaction executor:{}", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), fate.getStore().type(), + fateOps, workFinder.isAlive(), !transactionExecutor.isTerminated()); + } + } + } + + /** + * This fate executor is defined as being alive if any of its threads are running + */ + protected boolean isAlive() { + return !transactionExecutor.isTerminated() || workFinder.isAlive(); + } + + protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() { + return idleCountHistory; + } + + /** + * A single thread that finds transactions to work on and queues them up. Do not want each worker + * thread going to the store and looking for work as it would place more load on the store. + */ + private class WorkFinder implements Runnable { + + @Override + public void run() { + while (fate.getKeepRunning().get() && !isShutdown()) { + try { + fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> { + // The FateId with the fate operation 'fateOp' is workable by this FateExecutor if + // 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps') + // 2) The transaction was cancelled while NEW. This is an edge case that needs to be + // handled since this is the only case where we will have a runnable transaction + // that doesn't have a name. We allow any FateExecutor to work on this since this case + // should be rare and won't put much load on any one FateExecutor + var status = fateIdStatus.getStatus(); + var fateOp = fateIdStatus.getFateOperation().orElse(null); + if ((fateOp != null && fateOps.contains(fateOp)) + || txCancelledWhileNew(status, fateOp)) { + while (fate.getKeepRunning().get() && !isShutdown()) { + try { + // The reason for calling transfer instead of queueing is avoid rescanning the + // storage layer and adding the same thing over and over. For example if all + // threads were busy, the queue size was 100, and there are three runnable things + // in the store. Do not want to keep scanning the store adding those same 3 + // runnable things until the queue is full. + if (workQueue.tryTransfer(fateIdStatus.getFateId(), 100, MILLISECONDS)) { + break; + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } + }); + } catch (Exception e) { + log.warn("Unexpected failure while attempting to find work for fate", e); + workQueue.clear(); + } + } + + log.debug( + "FATE work finder for ops {} is gracefully exiting: either FATE is " + + "being shutdown ({}) and therefore all FATE threads are being shutdown or the " + + "FATE threads for the specific ops are being shutdown (due to FATE shutdown, " + + "or due to FATE config changes) ({})", + fateOps, !fate.getKeepRunning().get(), isShutdown()); + } + + private boolean txCancelledWhileNew(TStatus status, Fate.FateOperation fateOp) { + if (fateOp == null) { + // The only time a transaction should be runnable and not have a fate operation is if + // it was cancelled while NEW (and now is FAILED_IN_PROGRESS) + Preconditions.checkState(status == FAILED_IN_PROGRESS); + return true; + } + return false; + } + } + + protected class TransactionRunner implements Runnable { + + // used to signal a TransactionRunner to stop in the case where there are too many running + // i.e., + // 1. the property for the pool size decreased so we have to stop excess TransactionRunners + // or + // 2. this FateExecutor is no longer valid from config changes so we need to shutdown this + // FateExecutor + private final AtomicBoolean stop = new AtomicBoolean(false); + private volatile Long threadId = null; + + private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException { + idleWorkerCount.getAndIncrement(); + try { + while (fate.getKeepRunning().get() && !stop.get()) { + // Because of JDK-8301341 can not use poll w/ timeout until JDK 21+ + FateId unreservedFateId = workQueue.poll(); + + if (unreservedFateId == null) { + Thread.sleep(1); + continue; + } + var optionalopStore = fate.getStore().tryReserve(unreservedFateId); + if (optionalopStore.isPresent()) { + return optionalopStore; + } + } + } finally { + idleWorkerCount.decrementAndGet(); + } + + return Optional.empty(); + } + ++ private boolean isInterruptedException(Throwable e) { ++ if (e == null) { ++ return false; ++ } ++ ++ if (e instanceof InterruptedException) { ++ return true; ++ } ++ ++ for (Throwable suppressed : e.getSuppressed()) { ++ if (isInterruptedException(suppressed)) { ++ return true; ++ } ++ } ++ ++ return isInterruptedException(e.getCause()); ++ } ++ + @Override + public void run() { + runnerLog.trace("A TransactionRunner is starting for {} {} ", fate.getStore().type(), + fateOps); + threadId = Thread.currentThread().getId(); + try { + while (fate.getKeepRunning().get() && !isShutdown() && !stop.get()) { + FateTxStore<T> txStore = null; + ExecutionState state = new ExecutionState(); + try { + var optionalopStore = reserveFateTx(); + if (optionalopStore.isPresent()) { + txStore = optionalopStore.orElseThrow(); + } else { + continue; + } + state.status = txStore.getStatus(); + state.op = txStore.top(); + runnerLog.trace("Processing FATE transaction {} id: {} status: {}", + state.op == null ? null : state.op.getName(), txStore.getID(), state.status); + if (state.status == FAILED_IN_PROGRESS) { + processFailed(txStore, state.op); + } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { + try { + execute(txStore, state); ++ // It's possible that a Fate operation impl ++ // may not do the right thing with an ++ // InterruptedException. ++ if (Thread.currentThread().isInterrupted()) { ++ throw new InterruptedException("Fate Transaction Runner thread interrupted"); ++ } + if (state.op != null && state.deferTime != 0) { + // The current op is not ready to execute + continue; + } + } catch (StackOverflowException e) { + // the op that failed to push onto the stack was never executed, so no need to undo + // it just transition to failed and undo the ops that executed + transitionToFailed(txStore, e); + continue; + } catch (Exception e) { - blockIfHadoopShutdown(txStore.getID(), e); - transitionToFailed(txStore, e); - continue; ++ if (!isInterruptedException(e)) { ++ blockIfHadoopShutdown(txStore.getID(), e); ++ transitionToFailed(txStore, e); ++ continue; ++ } else { ++ if (fate.getKeepRunning().get()) { ++ throw e; ++ } else { ++ // If we are shutting down then Fate.shutdown was called ++ // and ExecutorService.shutdownNow was called resulting ++ // in this exception. We will exit at the top of the loop. ++ Thread.interrupted(); ++ continue; ++ } ++ } + } + + if (state.op == null) { + // transaction is finished + String ret = state.prevOp.getReturn(); + if (ret != null) { + txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret); + } + txStore.setStatus(SUCCESSFUL); + doCleanUp(txStore); + } + } + } catch (Exception e) { + String name = state.op == null ? null : state.op.getName(); + FateId txid = txStore == null ? null : txStore.getID(); - runnerLog.error( - "Uncaught exception in FATE runner thread processing {} id: {} status: {}", name, - txid, state.status, e); ++ if (isInterruptedException(e)) { ++ if (fate.getKeepRunning().get()) { ++ runnerLog.error( ++ "Uncaught InterruptedException in FATE runner thread processing {} id: {} status: {}", ++ name, txid, state.status, e); ++ } else { ++ // If we are shutting down then Fate.shutdown was called ++ // and ExecutorService.shutdownNow was called resulting ++ // in this exception. We will exit at the top of the loop, ++ // so continue this loop iteration normally. ++ Thread.interrupted(); ++ } ++ } else { ++ runnerLog.error( ++ "Uncaught exception in FATE runner thread processing {} id: {} status: {}", name, ++ txid, state.status, e); ++ } + } finally { + if (txStore != null) { + if (runnerLog.isTraceEnabled()) { + String name = state.op == null ? null : state.op.getName(); + runnerLog.trace("Completed FATE transaction {} id: {} status: {}", name, + txStore.getID(), state.status); + } + txStore.unreserve(Duration.ofMillis(state.deferTime)); + } + } + } + } finally { + log.trace("A TransactionRunner is exiting for {} {}", fate.getStore().type(), fateOps); + Preconditions.checkState(runningTxRunners.remove(this)); + threadId = null; + } + } + + private class ExecutionState { + Repo<T> prevOp = null; + Repo<T> op = null; + long deferTime = 0; + TStatus status; + } + + // Executes as many steps of a fate operation as possible + private void execute(final FateTxStore<T> txStore, final ExecutionState state) + throws Exception { + while (state.op != null && state.deferTime == 0) { + state.deferTime = executeIsReady(txStore.getID(), state.op); + + if (state.deferTime == 0) { + if (state.status == SUBMITTED) { + txStore.setStatus(IN_PROGRESS); + state.status = IN_PROGRESS; + } + + state.prevOp = state.op; + state.op = executeCall(txStore.getID(), state.op); + + if (state.op != null) { + // persist the completion of this step before starting to run the next so in the case of + // process death the completed steps are not rerun + txStore.push(state.op); + } + } + } + } + + /** + * The Hadoop Filesystem registers a java shutdown hook that closes the file system. This can + * cause threads to get spurious IOException. If this happens, instead of failing a FATE + * transaction just wait for process to die. When the manager start elsewhere the FATE + * transaction can resume. + */ + private void blockIfHadoopShutdown(FateId fateId, Exception e) { + if (ShutdownUtil.isShutdownInProgress()) { + + if (e instanceof AcceptableException) { + log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); + } else if (isIOException(e)) { + log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", fateId, e); + } else { + // sometimes code will catch an IOException caused by the hadoop shutdown hook and throw + // another exception without setting the cause. + log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); + } + + while (true) { + // Nothing is going to work well at this point, so why even try. Just wait for the end, + // preventing this FATE thread from processing further work and likely failing. + sleepUninterruptibly(1, MINUTES); + } + } + } + + private void transitionToFailed(FateTxStore<T> txStore, Exception e) { + final String msg = "Failed to execute Repo " + txStore.getID(); + // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor + // as a warning. They're a normal, handled failure condition. + if (e instanceof AcceptableException) { + var tableOpEx = (AcceptableThriftTableOperationException) e; + log.info("{} for table:{}({}) saw acceptable exception: {}", msg, tableOpEx.getTableName(), + tableOpEx.getTableId(), tableOpEx.getDescription()); + } else { + log.warn(msg, e); + } + txStore.setTransactionInfo(TxInfo.EXCEPTION, e); + txStore.setStatus(FAILED_IN_PROGRESS); + log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", txStore.getID()); + } + + private void processFailed(FateTxStore<T> txStore, Repo<T> op) { + while (op != null) { + undo(txStore.getID(), op); + + txStore.pop(); + op = txStore.top(); + } + + txStore.setStatus(FAILED); + doCleanUp(txStore); + } + + private void doCleanUp(FateTxStore<T> txStore) { + Boolean autoClean = (Boolean) txStore.getTransactionInfo(TxInfo.AUTO_CLEAN); + if (autoClean != null && autoClean) { + txStore.delete(); + } else { + // no longer need persisted operations, so delete them to save space in case + // TX is never cleaned up... + while (txStore.top() != null) { + txStore.pop(); + } + } + } + + private void undo(FateId fateId, Repo<T> op) { + try { + op.undo(fateId, environment); + } catch (Exception e) { + log.warn("Failed to undo Repo, " + fateId, e); + } + } + + protected boolean flagStop() { + boolean setStop = stop.compareAndSet(false, true); + if (setStop) { + runnerLog.trace("set stop for {}", threadId); + } + return setStop; + } + + protected boolean isFlaggedToStop() { + return stop.get(); + } + + @Override + public String toString() { + return "threadId:" + threadId + " stop:" + stop.get(); + } + + } + + protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception { + var startTime = Timer.startNew(); + var deferTime = op.isReady(fateId, environment); + log.debug("Running {}.isReady() {} took {} ms and returned {}", op.getName(), fateId, + startTime.elapsed(MILLISECONDS), deferTime); + return deferTime; + } + + protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception { + var startTime = Timer.startNew(); + var next = op.call(fateId, environment); + log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), fateId, + startTime.elapsed(MILLISECONDS), next == null ? "null" : next.getName()); + + return next; + } + + @Override + public String toString() { + return String.format("FateExecutor:{FateOps=%s,Name=%s,PoolSize:%s,TransactionRunners:%s}", + fateOps, name, runningTxRunners.size(), runningTxRunners); + } +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java index d965881c17,0000000000..c526cd6f75 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java @@@ -1,570 -1,0 +1,642 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; ++import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; ++import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.conf.ConfigurationCopy; ++import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; ++import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; +import org.apache.accumulo.test.util.Wait; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FateITBase extends SharedMiniClusterBase implements FateTestRunner<TestEnv> { + + private static final Logger LOG = LoggerFactory.getLogger(FateITBase.class); + + private static CountDownLatch callStarted; + private static CountDownLatch finishCall; + private static CountDownLatch undoLatch; ++ private static AtomicReference<Throwable> interruptedException = new AtomicReference<>(); + + private enum ExceptionLocation { + CALL, IS_READY + } + + public static class TestRepo implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + + private final String data; + + public TestRepo() { + this("test"); + } + + public TestRepo(String data) { + this.data = data; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) throws Exception { + return 0; + } + + @Override + public String getName() { + return "TestRepo_" + data; + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("Entering call {}", fateId); + try { + FateITBase.inCall(); + return null; + } finally { + LOG.debug("Leaving call {}", fateId); + } + } + + @Override + public void undo(FateId fateId, TestEnv environment) throws Exception { + + } + + @Override + public String getReturn() { + return data + "_ret"; + } + } + + public static class TestOperationFails implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(TestOperationFails.class); + private static List<String> undoOrder = new ArrayList<>(); + private static final int TOTAL_NUM_OPS = 3; + private int opNum; + private final String opName; + private final ExceptionLocation location; + + public TestOperationFails(int opNum, ExceptionLocation location) { + this.opNum = opNum; + this.opName = "OP" + opNum; + this.location = location; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered isReady()", opName, fateId); + if (location == ExceptionLocation.IS_READY) { + if (opNum < TOTAL_NUM_OPS) { + return 0; + } else { + throw new Exception(opName + " " + fateId + " isReady() failed - this is expected"); + } + } else { + return 0; + } + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void undo(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered undo()", opName, fateId); + undoOrder.add(opName); + undoLatch.countDown(); + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered call()", opName, fateId); + if (location == ExceptionLocation.CALL) { + if (opNum < TOTAL_NUM_OPS) { + return new TestOperationFails(++opNum, location); + } else { + throw new Exception(opName + " " + fateId + " call() failed - this is expected"); + } + } else { + return new TestOperationFails(++opNum, location); + } + } + + @Override + public String getReturn() { + return "none"; + } + } + + /** + * Test Repo that allows configuring a delay time to be returned in isReady(). + */ + public static class DeferredTestRepo implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + + private final String data; + + // These are static as we don't want to serialize them and they should + // be shared across all instances during the test + private static final AtomicInteger executedCalls = new AtomicInteger(); + private static final AtomicLong delay = new AtomicLong(); + private static final CountDownLatch callLatch = new CountDownLatch(1); + + public DeferredTestRepo(String data) { + this.data = data; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) { + LOG.debug("{} delayed {}", fateId, delay.get()); + return delay.get(); + } + + @Override + public String getName() { + return "TestRepo_" + data; + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + callLatch.await(); + LOG.debug("Executing call {}, total executed {}", fateId, executedCalls.incrementAndGet()); + return null; + } + + @Override + public void undo(FateId fateId, TestEnv environment) { + + } + + @Override + public String getReturn() { + return data + "_ret"; + } + } + + @Test + @Timeout(30) + public void testTransactionStatus() throws Exception { + executeTest(this::testTransactionStatus); + } + + protected void testTransactionStatus(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testTransactionStatus"), true, + "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); + // wait for call() to be called + callStarted.await(); + assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId)); + // tell the op to exit the method + finishCall.countDown(); + // Check that it transitions to SUCCESSFUL and then removed (UNKNOWN) + final var sawSuccess = new AtomicBoolean(false); + Wait.waitFor(() -> { + TStatus s; + switch (s = getTxStatus(sctx, fateId)) { + case IN_PROGRESS: + if (sawSuccess.get()) { + fail("Should never see IN_PROGRESS after seeing SUCCESSFUL"); + } + break; + case SUCCESSFUL: + // expected, but might be too quick to be detected + if (sawSuccess.compareAndSet(false, true)) { + LOG.debug("Saw expected transaction status change to SUCCESSFUL"); + } + break; + case UNKNOWN: + if (!sawSuccess.get()) { + LOG.debug("Never saw transaction status change to SUCCESSFUL, but that's okay"); + } + return true; + default: + fail("Saw unexpected status: " + s); + } + // keep waiting for UNKNOWN + return false; + }, SECONDS.toMillis(30), 10); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileNew() throws Exception { + executeTest(this::testCancelWhileNew); + } + + protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileNew with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + // cancel the transaction + assertTrue(fate.cancel(fateId)); + assertTrue( + FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileNew"), true, + "Test Op"); + Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId)); + // nothing should have run + assertEquals(1, callStarted.getCount()); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileSubmittedAndRunning() throws Exception { + executeTest(this::testCancelWhileSubmittedAndRunning); + } + + protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileSubmittedAndRunning"), + false, "Test Op"); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId)); + // This is false because the transaction runner has reserved the FaTe + // transaction. + assertFalse(fate.cancel(fateId)); + callStarted.await(); + finishCall.countDown(); + Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileInCall() throws Exception { + executeTest(this::testCancelWhileInCall); + } + + protected void testCancelWhileInCall(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileInCall with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileInCall"), true, + "Test Op"); + assertEquals(SUBMITTED, getTxStatus(sctx, fateId)); + // wait for call() to be called + callStarted.await(); + // cancel the transaction + assertFalse(fate.cancel(fateId)); + finishCall.countDown(); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + + } + + @Test + @Timeout(30) + public void testDeferredOverflow() throws Exception { + // Set a maximum deferred map size of 10 transactions so that when the 11th + // is seen the Fate store should clear the deferred map and mark + // the flag as overflow so that all the deferred transactions will be run + executeTest(this::testDeferredOverflow, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); + } + + protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + DeferredTestRepo.executedCalls.set(0); + // Initialize the repo to have a delay of 30 seconds + // so it will be deferred when submitted + DeferredTestRepo.delay.set(30000); + + Set<FateId> transactions = new HashSet<>(); + + // Start by creating 10 transactions that are all deferred which should + // fill up the deferred map with all 10 as we set the max deferred limit + // to only allow 10 transactions + for (int i = 0; i < 10; i++) { + submitDeferred(fate, sctx, transactions); + } + + // Verify all 10 are deferred in the map and each will + // We should not be in an overflow state yet + Wait.waitFor(() -> store.getDeferredCount() == 10); + assertFalse(store.isDeferredOverflow()); + + // After verifying all 10 are deferred, submit another 10 + // which should trigger an overflow. We are blocking in the + // call method of DeferredTestRepo at this point using a countdown + // latch to prevent fate executor from running early and clearing + // the deferred overflow flag before we can check it below + for (int i = 0; i < 10; i++) { + submitDeferred(fate, sctx, transactions); + } + // Verify deferred overflow is true and map is now empty + Wait.waitFor(() -> store.getDeferredCount() == 0); + Wait.waitFor(store::isDeferredOverflow); + + // Set the delay to 0 and countdown so we will process the + // call method in the repos. We need to change the delay because + // due to the async nature of Fate it's possible some of the submitted + // repos previously wouldn't be processed in the first batch until + // after the flag was cleared which would trigger a long delay again + DeferredTestRepo.delay.set(0); + DeferredTestRepo.callLatch.countDown(); + + // Verify the flag was cleared and everything ran + Wait.waitFor(() -> !store.isDeferredOverflow()); + Wait.waitFor(() -> DeferredTestRepo.executedCalls.get() == 20); + + // Verify all 20 unique transactions finished + Wait.waitFor(() -> { + transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN); + return transactions.isEmpty(); + }); + + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + @Timeout(30) + public void testRepoFails() throws Exception { + // Set a maximum deferred map size of 10 transactions so that when the 11th + // is seen the Fate store should clear the deferred map and mark + // the flag as overflow so that all the deferred transactions will be run + executeTest(this::testRepoFails, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); + } + + protected void testRepoFails(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + /* + * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that + * undo() will be called back up the chain of Repo's and in the correct order. The test works as + * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3 + * is called and throws an exception (in call() or isReady()). It is then expected that: 1) + * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1 + */ + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1"); + /* + * Test exception in call() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + FateId fateId = fate.startTransaction(); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, ExceptionLocation.CALL), + false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(fateId)); + assertTrue(fate.getException(fateId).getMessage().contains("call() failed")); + /* + * Test exception in isReady() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + fateId = fate.startTransaction(); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, + new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(fateId)); + assertTrue(fate.getException(fateId).getMessage().contains("isReady() failed")); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + @Timeout(30) + public void testNoWriteAfterDelete() throws Exception { + executeTest(this::testNoWriteAfterDelete); + } + + protected void testNoWriteAfterDelete(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + final FateId fateId = store.create(); + final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete"); + + var txStore = store.reserve(fateId); + + // all write ops should be ok after reservation + assertDoesNotThrow(() -> txStore.push(repo)); + assertDoesNotThrow(() -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertDoesNotThrow(txStore::pop); + assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP)); + assertDoesNotThrow(txStore::delete); + + // test that all write ops result in an exception since the tx has been deleted + assertThrows(Exception.class, () -> txStore.push(repo)); + assertThrows(Exception.class, () -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertThrows(Exception.class, txStore::pop); + assertThrows(Exception.class, + () -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP)); + assertThrows(Exception.class, txStore::delete); + } + + private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, Set<FateId> transactions) { + FateId fateId = fate.startTransaction(); + transactions.add(fateId); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new DeferredTestRepo("testDeferredOverflow"), true, + "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); + } + + protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) { + return new Fate<>(new TestEnv(), store, false, r -> r + "", + FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, "AllFateOps"), + new ScheduledThreadPoolExecutor(2)); + } + + protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); + ++ @Test ++ @Timeout(60) ++ public void testShutdownDoesNotFailTx() throws Exception { ++ executeTest(this::testShutdownDoesNotFailTx); ++ } ++ ++ protected void testShutdownDoesNotFailTx(FateStore<TestEnv> store, ServerContext sctx) ++ throws Exception { ++ ConfigurationCopy config = new ConfigurationCopy(); ++ config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); ++ ++ Fate<TestEnv> fate = initializeFate(store); ++ ++ // Wait for the transaction runner to be scheduled. ++ UtilWaitThread.sleep(3000); ++ ++ callStarted = new CountDownLatch(1); ++ finishCall = new CountDownLatch(1); ++ ++ FateId txid = fate.startTransaction(); ++ assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); ++ ++ fate.seedTransaction(TEST_FATE_OP, txid, new TestRepo("testShutdownDoesNotFailTx"), true, ++ "Test Op"); ++ ++ assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); ++ ++ // wait for call() to be called ++ callStarted.await(); ++ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); ++ ++ // shutdown fate ++ fate.shutdown(0, SECONDS); ++ ++ // tell the op to exit the method ++ Wait.waitFor(() -> interruptedException.get() != null); ++ interruptedException.set(null); ++ ++ // restart fate ++ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); ++ fate = initializeFate(store); ++ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); ++ ++ // Restarting the transaction runners will retry the in-progress ++ // transaction. Reset the CountDownLatch's to confirm. ++ callStarted = new CountDownLatch(1); ++ finishCall = new CountDownLatch(1); ++ ++ callStarted.await(); ++ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); ++ finishCall.countDown(); ++ ++ // This should complete normally, cleaning up the tx and deleting it from ZK ++ TStatus status = getTxStatus(sctx, txid); ++ while (status != TStatus.UNKNOWN) { ++ Thread.sleep(100); ++ status = getTxStatus(sctx, txid); ++ } ++ assertNull(interruptedException.get()); ++ } ++ + private static void inCall() throws InterruptedException { + // signal that call started + callStarted.countDown(); - // wait for the signal to exit the method - finishCall.await(); ++ try { ++ // wait for the signal to exit the method ++ finishCall.await(); ++ } catch (InterruptedException e) { ++ LOG.debug("InterruptedException occurred inCall."); ++ interruptedException.set(e); ++ throw e; ++ } + } +}
