This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new dd60da12b3 Small FATE improvements (#5813)
dd60da12b3 is described below
commit dd60da12b327caad99e0f00c1789a83576e0d0c5
Author: Kevin Rathbun <[email protected]>
AuthorDate: Fri Aug 22 12:17:30 2025 -0400
Small FATE improvements (#5813)
* Small FATE improvements
partially addresses #5787
* Improved how the FATE work finders are shutdown
* Some improved FATE logs
* Potential concurrency problem fix
---
.../java/org/apache/accumulo/core/fate/Fate.java | 4 +-
.../apache/accumulo/core/fate/FateExecutor.java | 173 ++++++++++-----------
2 files changed, 88 insertions(+), 89 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index c86e8168e9..4149bc8539 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -334,7 +334,7 @@ public class Fate<T> {
synchronized (fateExecutors) {
for (var fateExecutor : fateExecutors) {
if (fateExecutor.getFateOps().equals(fateOps)) {
- return fateExecutor.getRunningTxRunners().size();
+ return fateExecutor.getNumRunningTxRunners();
}
}
}
@@ -348,7 +348,7 @@ public class Fate<T> {
@VisibleForTesting
public int getTotalTxRunnersActive() {
synchronized (fateExecutors) {
- return fateExecutors.stream().mapToInt(fe ->
fe.getRunningTxRunners().size()).sum();
+ return
fateExecutors.stream().mapToInt(FateExecutor::getNumRunningTxRunners).sum();
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index c986dfd84b..ddf3fc81c3 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.util.threads.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -105,96 +106,95 @@ public class FateExecutor<T> {
protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
long idleCheckIntervalMillis) {
final var pool = transactionExecutor;
- final var runningTxRunners = getRunningTxRunners();
final int configured = poolConfigs.get(fateOps);
ThreadPools.resizePool(pool, () -> configured, poolName);
- final int needed = configured - runningTxRunners.size();
- if (needed > 0) {
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
- for (int i = 0; i < needed; i++) {
- try {
- pool.execute(new TransactionRunner());
- } catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Expected error adding transaction runner to FaTE
executor pool. "
- + "The pool is shutdown.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Unexpected error adding transaction runner to FaTE
executor pool.", e);
+ synchronized (runningTxRunners) {
+ final int needed = configured - runningTxRunners.size();
+ if (needed > 0) {
+ // If the pool grew, then ensure that there is a TransactionRunner for
each thread
+ for (int i = 0; i < needed; i++) {
+ try {
+ pool.execute(new TransactionRunner());
+ } catch (RejectedExecutionException e) {
+ // RejectedExecutionException could be shutting down
+ if (pool.isShutdown()) {
+ // The exception is expected in this case, no need to spam the
logs.
+ log.trace("Expected error adding transaction runner to FaTE
executor pool. "
+ + "The pool is shutdown.", e);
+ } else {
+ // This is bad, FaTE may no longer work!
+ log.error("Unexpected error adding transaction runner to FaTE
executor pool.", e);
+ }
+ break;
}
- break;
- }
- }
- idleCountHistory.clear();
- } else if (needed < 0) {
- // If we need the pool to shrink, then ensure excess TransactionRunners
are safely
- // stopped.
- // Flag the necessary number of TransactionRunners to safely stop when
they are done
- // work on a transaction.
- int numFlagged = (int) runningTxRunners.stream()
- .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count();
- int numToStop = -1 * (numFlagged + needed);
- for (var runner : runningTxRunners) {
- if (numToStop <= 0) {
- break;
- }
- if (runner.flagStop()) {
- log.trace("Flagging a TransactionRunner to stop...");
- numToStop--;
}
- }
- } else {
- // The pool size did not change, but should it based on idle Fate
threads? Maintain
- // count of the last X minutes of idle Fate threads. If zero 95% of the
time, then
- // suggest that the pool size be increased or the fate ops assigned to
that pool be
- // split into separate pools.
- final long interval = Math.min(60,
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
- var fateConfigProp = fate.getFateConfigProp();
-
- if (interval == 0) {
idleCountHistory.clear();
+ } else if (needed < 0) {
+ // If we need the pool to shrink, then ensure excess
TransactionRunners are safely
+ // stopped.
+ // Flag the necessary number of TransactionRunners to safely stop when
they are done
+ // work on a transaction.
+ int numFlagged = (int) runningTxRunners.stream()
+ .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count();
+ int numToStop = -1 * (numFlagged + needed);
+ for (var runner : runningTxRunners) {
+ if (numToStop <= 0) {
+ break;
+ }
+ if (runner.flagStop()) {
+ log.trace("Flagging a TransactionRunner to stop...");
+ numToStop--;
+ }
+ }
} else {
- if (idleCountHistory.size() >= interval * 2) { // this task runs every
30s
- int zeroFateThreadsIdleCount = 0;
- for (Integer idleConsumerCount : idleCountHistory) {
- if (idleConsumerCount == 0) {
- zeroFateThreadsIdleCount++;
+ // The pool size did not change, but should it based on idle Fate
threads? Maintain
+ // count of the last X minutes of idle Fate threads. If zero 95% of
the time, then
+ // suggest that the pool size be increased or the fate ops assigned to
that pool be
+ // split into separate pools.
+ final long interval =
+ Math.min(60,
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
+ var fateConfigProp = fate.getFateConfigProp();
+
+ if (interval == 0) {
+ idleCountHistory.clear();
+ } else {
+ if (idleCountHistory.size() >= interval * 2) { // this task runs
every 30s
+ int zeroFateThreadsIdleCount = 0;
+ for (Integer idleConsumerCount : idleCountHistory) {
+ if (idleConsumerCount == 0) {
+ zeroFateThreadsIdleCount++;
+ }
}
- }
- boolean needMoreThreads =
- (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >=
0.95;
- if (needMoreThreads) {
- fate.getNeedMoreThreadsWarnCount().incrementAndGet();
- log.warn(
- "All {} Fate threads working on the fate ops {} appear to be
busy for "
- + "the last {} minutes. Consider increasing the value for
the "
- + "entry in the property {} or splitting the fate ops
across "
- + "multiple entries/pools.",
- fate.getStore().type(), fateOps, interval,
fateConfigProp.getKey());
- // Clear the history so that we don't log for interval minutes.
- idleCountHistory.clear();
- } else {
- while (idleCountHistory.size() >= interval * 2) {
- idleCountHistory.remove();
+ boolean needMoreThreads =
+ (zeroFateThreadsIdleCount / (double) idleCountHistory.size())
>= 0.95;
+ if (needMoreThreads) {
+ fate.getNeedMoreThreadsWarnCount().incrementAndGet();
+ log.warn(
+ "All {} Fate threads working on the fate ops {} appear to be
busy for "
+ + "the last {} minutes. Consider increasing the value
for the "
+ + "entry in the property {} or splitting the fate ops
across "
+ + "multiple entries/pools.",
+ fate.getStore().type(), fateOps, interval,
fateConfigProp.getKey());
+ // Clear the history so that we don't log for interval minutes.
+ idleCountHistory.clear();
+ } else {
+ while (idleCountHistory.size() >= interval * 2) {
+ idleCountHistory.remove();
+ }
}
}
+ idleCountHistory.add(workQueue.getWaitingConsumerCount());
}
- idleCountHistory.add(workQueue.getWaitingConsumerCount());
}
}
}
/**
- * @return an unmodifiable, shallow copy of the currently running
transaction runners
+ * @return the number of currently running transaction runners
*/
- protected Set<TransactionRunner> getRunningTxRunners() {
- Set<TransactionRunner> copy;
- synchronized (runningTxRunners) {
- copy = new HashSet<>(runningTxRunners);
- }
- return Collections.unmodifiableSet(copy);
+ @VisibleForTesting
+ protected int getNumRunningTxRunners() {
+ return runningTxRunners.size();
}
protected Set<Fate.FateOperation> getFateOps() {
@@ -205,14 +205,14 @@ public class FateExecutor<T> {
* Initiates the shutdown of this FateExecutor. This means the pool
executing TransactionRunners
* will no longer accept new TransactionRunners, the currently running
TransactionRunners will
* terminate after they are done with their current transaction, if
applicable, and the work
- * finder is interrupted. {@link #isShutdown()} returns true after this is
called.
+ * finder is shutdown. {@link #isShutdown()} returns true after this is
called.
*/
protected void initiateShutdown() {
transactionExecutor.shutdown();
synchronized (runningTxRunners) {
runningTxRunners.forEach(TransactionRunner::flagStop);
}
- workFinder.interrupt();
+ // work finder will terminate since this.isShutdown() is true
}
/**
@@ -233,8 +233,8 @@ public class FateExecutor<T> {
if (timeout > 0) {
while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) &&
isAlive()) {
if (!transactionExecutor.awaitTermination(1, SECONDS)) {
- log.debug("Fate {} is waiting for worker threads for fate ops {} to
terminate",
- fate.getStore().type(), fateOps);
+ log.debug("Fate {} is waiting for {} worker threads for fate ops {}
to terminate",
+ fate.getStore().type(), runningTxRunners.size(), fateOps);
continue;
}
@@ -306,18 +306,17 @@ public class FateExecutor<T> {
}
});
} catch (Exception e) {
- if (!fate.getKeepRunning().get() || isShutdown()) {
- log.debug("Expected failure while attempting to find work for
fate: either fate is "
- + "being shutdown and therefore all fate threads are being
shutdown or the "
- + "fate threads assigned to work on {} were invalidated by
config changes "
- + "and are being shutdown", fateOps, e);
- } else {
- log.warn("Unexpected failure while attempting to find work for
fate", e);
- }
-
+ log.warn("Unexpected failure while attempting to find work for
fate", e);
workQueue.clear();
}
}
+
+ log.debug(
+ "FATE work finder for ops {} is gracefully exiting: either FATE is "
+ + "being shutdown ({}) and therefore all FATE threads are being
shutdown or the "
+ + "FATE threads for the specific ops are being shutdown (due to
FATE shutdown, "
+ + "or due to FATE config changes) ({})",
+ fateOps, !fate.getKeepRunning().get(), isShutdown());
}
private boolean txCancelledWhileNew(TStatus status, Fate.FateOperation
fateOp) {