This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new b29e3110bd Fate and FateExecutor improvements (#5817)
b29e3110bd is described below
commit b29e3110bd4bbfc3384aebd9a3552490df634a3e
Author: Dave Marion <[email protected]>
AuthorDate: Mon Aug 25 15:25:45 2025 -0400
Fate and FateExecutor improvements (#5817)
Modifications to Fate and FateExecutor to improve
correctness.
1. Modified Fate and removed the `startFateExecutors`
method which may have created additional FateExecutor
instances, but may not have added them to the
`fateExecutors` set if the `FatePoolsWatcher` thread
added them first.
2. Modified `FateExecutor.resizeFateExecutor` to add
the TransactionRunner instances to the `runningTxRunners`
when they are created, not when they are called, so
that the shutdown code has a complete view of all
TransactionRunner instances.
3. Added additional logging
4. Added check in UserFateStore constructor that the
transaction table exists.
Co-authored-by: Keith Turner <[email protected]>
Co-authored-by: Kevin Rathbun <[email protected]>
---
.../java/org/apache/accumulo/core/fate/Fate.java | 47 ++++++++++------------
.../apache/accumulo/core/fate/FateExecutor.java | 41 ++++++++++---------
.../accumulo/core/fate/user/UserFateStore.java | 2 +
.../accumulo/test/fate/FatePoolsWatcherITBase.java | 6 +--
.../org/apache/accumulo/test/fate/FlakyFate.java | 5 ---
5 files changed, 48 insertions(+), 53 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 4149bc8539..42b30d88b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -83,7 +83,8 @@ public class Fate<T> {
private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
- private final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
+ // Visible for FlakyFate test object
+ protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
public enum TxInfo {
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -170,8 +171,9 @@ public class Fate<T> {
public void run() {
// Read from the config here and here only. Must avoid reading the same
property from the
// config more than once since it can change at any point in this
execution
- var poolConfigs = getPoolConfigurations(conf);
- var idleCheckIntervalMillis =
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+ final var poolConfigs = getPoolConfigurations(conf);
+ final var idleCheckIntervalMillis =
+ conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
// shutdown task: shutdown fate executors whose set of fate operations
are no longer present
// in the config
@@ -183,15 +185,17 @@ public class Fate<T> {
// if this fate executors set of fate ops is no longer present in
the config...
if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
if (!fateExecutor.isShutdown()) {
- log.debug("The config for {} has changed invalidating {}.
Gracefully shutting down "
- + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+ log.debug(
+ "[{}] The config for {} has changed invalidating {}.
Gracefully shutting down "
+ + "the FateExecutor.",
+ store.type(), getFateConfigProp(), fateExecutor);
fateExecutor.initiateShutdown();
} else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
- log.debug("{} has been shutdown, but is still actively working
on transactions.",
- fateExecutor);
+ log.debug("[{}] {} has been shutdown, but is still actively
working on transactions.",
+ store.type(), fateExecutor);
} else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
- log.debug("{} has been shutdown and all threads have safely
terminated.",
- fateExecutor);
+ log.debug("[{}] {} has been shutdown and all threads have safely
terminated.",
+ store.type(), fateExecutor);
fateExecutorsIter.remove();
}
}
@@ -207,6 +211,7 @@ public class Fate<T> {
synchronized (fateExecutors) {
if (fateExecutors.stream().map(FateExecutor::getFateOps)
.noneMatch(fo -> fo.equals(configFateOps))) {
+ log.debug("[{}] Adding FateExecutor for {}", store.type(),
configFateOps);
fateExecutors
.add(new FateExecutor<>(Fate.this, environment, configFateOps,
configPoolSize));
}
@@ -268,16 +273,6 @@ public class Fate<T> {
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
- startFateExecutors(environment, conf, fateExecutors);
- }
-
- protected void startFateExecutors(T environment, AccumuloConfiguration conf,
- Set<FateExecutor<T>> fateExecutors) {
- for (var poolConf : getPoolConfigurations(conf).entrySet()) {
- // no fate threads are running at this point; fine not to synchronize
- fateExecutors
- .add(new FateExecutor<>(this, environment, poolConf.getKey(),
poolConf.getValue()));
- }
}
/**
@@ -382,7 +377,7 @@ public class Fate<T> {
// multiple times for a transaction... but it will only seed once
public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T>
repo,
boolean autoCleanUp, String goalMessage) {
- log.info("Seeding {} {}", fateId, goalMessage);
+ log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId,
goalMessage);
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
}
@@ -405,16 +400,18 @@ public class Fate<T> {
var txStore = optionalTxStore.orElseThrow();
try {
TStatus status = txStore.getStatus();
- log.info("status is: {}", status);
+ log.info("[{}] status is: {}", store.type(), status);
if (status == NEW || status == SUBMITTED) {
txStore.setTransactionInfo(TxInfo.EXCEPTION, new
TApplicationException(
TApplicationException.INTERNAL_ERROR, "Fate transaction
cancelled by user"));
txStore.setStatus(FAILED_IN_PROGRESS);
- log.info("Updated status for {} to FAILED_IN_PROGRESS because it
was cancelled by user",
- fateId);
+ log.info(
+ "[{}] Updated status for {} to FAILED_IN_PROGRESS because it
was cancelled by user",
+ store.type(), fateId);
return true;
} else {
- log.info("{} cancelled by user but already in progress or finished
state", fateId);
+ log.info("[{}] {} cancelled by user but already in progress or
finished state",
+ store.type(), fateId);
return false;
}
} finally {
@@ -425,7 +422,7 @@ public class Fate<T> {
UtilWaitThread.sleep(500);
}
}
- log.info("Unable to reserve transaction {} to cancel it", fateId);
+ log.info("[{}] Unable to reserve transaction {} to cancel it",
store.type(), fateId);
return false;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 5ce288d3e2..56a6c17f44 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -22,7 +22,6 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
@@ -49,6 +48,7 @@ import java.util.stream.Collectors;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
@@ -57,7 +57,6 @@ import org.apache.accumulo.core.util.threads.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -107,30 +106,23 @@ public class FateExecutor<T> {
*/
protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
long idleCheckIntervalMillis) {
- final var pool = transactionExecutor;
final int configured = poolConfigs.get(fateOps);
- ThreadPools.resizePool(pool, () -> configured, poolName);
+ ThreadPools.resizePool(transactionExecutor, () -> configured, poolName);
synchronized (runningTxRunners) {
final int running = runningTxRunners.size();
final int needed = configured - running;
log.trace("resizing pools configured:{} running:{} needed:{}
fateOps:{}", configured, running,
needed, fateOps);
-
if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for
each thread
for (int i = 0; i < needed; i++) {
+ final TransactionRunner tr = new TransactionRunner();
try {
- pool.execute(new TransactionRunner());
+ runningTxRunners.add(tr);
+ transactionExecutor.execute(tr);
} catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Expected error adding transaction runner to FaTE
executor pool. "
- + "The pool is shutdown.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Unexpected error adding transaction runner to FaTE
executor pool.", e);
- }
+ runningTxRunners.remove(tr);
+ log.error("Unexpected error adding transaction runner to FaTE
executor pool.", e);
break;
}
}
@@ -203,7 +195,6 @@ public class FateExecutor<T> {
/**
* @return the number of currently running transaction runners
*/
- @VisibleForTesting
protected int getNumRunningTxRunners() {
return runningTxRunners.size();
}
@@ -343,6 +334,7 @@ public class FateExecutor<T> {
}
protected class TransactionRunner implements Runnable {
+
// used to signal a TransactionRunner to stop in the case where there are
too many running
// i.e.,
// 1. the property for the pool size decreased so we have to stop excess
TransactionRunners
@@ -350,7 +342,6 @@ public class FateExecutor<T> {
// 2. this FateExecutor is no longer valid from config changes so we need
to shutdown this
// FateExecutor
private final AtomicBoolean stop = new AtomicBoolean(false);
-
private volatile Long threadId = null;
private Optional<FateTxStore<T>> reserveFateTx() throws
InterruptedException {
@@ -378,12 +369,11 @@ public class FateExecutor<T> {
@Override
public void run() {
- runningTxRunners.add(this);
runnerLog.trace("A TransactionRunner is starting for {} {} ",
fate.getStore().type(),
fateOps);
threadId = Thread.currentThread().getId();
try {
- while (fate.getKeepRunning().get() && !stop.get()) {
+ while (fate.getKeepRunning().get() && !isShutdown() && !stop.get()) {
FateTxStore<T> txStore = null;
ExecutionState state = new ExecutionState();
try {
@@ -395,6 +385,8 @@ public class FateExecutor<T> {
}
state.status = txStore.getStatus();
state.op = txStore.top();
+ runnerLog.trace("Processing FATE transaction {} id: {} status:
{}", state.op.getName(),
+ txStore.getID(), state.status);
if (state.status == FAILED_IN_PROGRESS) {
processFailed(txStore, state.op);
} else if (state.status == SUBMITTED || state.status ==
IN_PROGRESS) {
@@ -426,9 +418,18 @@ public class FateExecutor<T> {
}
}
} catch (Exception e) {
- runnerLog.error("Uncaught exception in FATE runner thread.", e);
+ String name = state.op == null ? null : state.op.getName();
+ FateId txid = txStore == null ? null : txStore.getID();
+ runnerLog.error(
+ "Uncaught exception in FATE runner thread processing {} id: {}
status: {}", name,
+ txid, state.status, e);
} finally {
if (txStore != null) {
+ if (runnerLog.isTraceEnabled()) {
+ String name = state.op == null ? null : state.op.getName();
+ runnerLog.trace("Completed FATE transaction {} id: {} status:
{}", name,
+ txStore.getID(), state.status);
+ }
txStore.unreserve(Duration.ofMillis(state.deferTime));
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index f38c50a2e9..5f71550a32 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -112,6 +112,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
super(lockID, isLockHeld, maxDeferred, fateIdGenerator);
this.context = Objects.requireNonNull(context);
this.tableName = Objects.requireNonNull(tableName);
+
Preconditions.checkArgument(this.context.tableOperations().exists(tableName),
+ "user fate store table " + tableName + " does not exist.");
this.writer = Suppliers.memoize(() -> {
try {
return createConditionalWriterForFateTable(this.tableName);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
index 9a562a7c76..66b80d41dd 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.test.fate;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
import java.util.Set;
@@ -716,8 +715,9 @@ public abstract class FatePoolsWatcherITBase extends
SharedMiniClusterBase
@Override
public long isReady(FateId fateId, PoolResizeTestEnv environment) throws
Exception {
environment.numWorkers.incrementAndGet();
- assertTrue(environment.isReadyLatch.await(2, TimeUnit.MINUTES),
- "Timed out waiting for isReady latch");
+ if (!environment.isReadyLatch.await(2, TimeUnit.MINUTES)) {
+ throw new IllegalStateException("Timed out waiting for env latch to be
ready.");
+ }
return 0;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index 15e429d581..2e79fc4729 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -39,11 +39,6 @@ public class FlakyFate<T> extends Fate<T> {
public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String>
toLogStrFunc,
AccumuloConfiguration conf) {
super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
- }
-
- @Override
- protected void startFateExecutors(T environment, AccumuloConfiguration conf,
- Set<FateExecutor<T>> fateExecutors) {
for (var poolConfig : getPoolConfigurations(conf).entrySet()) {
fateExecutors.add(
new FlakyFateExecutor<>(this, environment, poolConfig.getKey(),
poolConfig.getValue()));