This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 7bdba35638 improves fate executor logging (#5824)
7bdba35638 is described below
commit 7bdba35638b0c7c0e11e356f8f39e07f9e66a036
Author: Keith Turner <[email protected]>
AuthorDate: Mon Aug 25 11:24:07 2025 -0400
improves fate executor logging (#5824)
---
.../apache/accumulo/core/fate/FateExecutor.java | 29 +++++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index ddf3fc81c3..789beae96b 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -109,7 +109,11 @@ public class FateExecutor<T> {
final int configured = poolConfigs.get(fateOps);
ThreadPools.resizePool(pool, () -> configured, poolName);
synchronized (runningTxRunners) {
- final int needed = configured - runningTxRunners.size();
+ final int running = runningTxRunners.size();
+ final int needed = configured - running;
+ log.trace("resizing pools configured:{} running:{} needed:{}
fateOps:{}", configured, running,
+ needed, fateOps);
+
if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for
each thread
for (int i = 0; i < needed; i++) {
@@ -208,6 +212,7 @@ public class FateExecutor<T> {
* finder is shutdown. {@link #isShutdown()} returns true after this is
called.
*/
protected void initiateShutdown() {
+ log.debug("Initiated shutdown {}", fateOps);
transactionExecutor.shutdown();
synchronized (runningTxRunners) {
runningTxRunners.forEach(TransactionRunner::flagStop);
@@ -339,6 +344,8 @@ public class FateExecutor<T> {
// FateExecutor
private final AtomicBoolean stop = new AtomicBoolean(false);
+ private volatile Long threadId = null;
+
private Optional<FateTxStore<T>> reserveFateTx() throws
InterruptedException {
while (fate.getKeepRunning().get() && !stop.get()) {
FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
@@ -358,6 +365,9 @@ public class FateExecutor<T> {
@Override
public void run() {
runningTxRunners.add(this);
+ runnerLog.trace("A TransactionRunner is starting for {} {} ",
fate.getStore().type(),
+ fateOps);
+ threadId = Thread.currentThread().getId();
try {
while (fate.getKeepRunning().get() && !stop.get()) {
FateTxStore<T> txStore = null;
@@ -410,8 +420,9 @@ public class FateExecutor<T> {
}
}
} finally {
- log.trace("A TransactionRunner is exiting...");
+ log.trace("A TransactionRunner is exiting for {} {}",
fate.getStore().type(), fateOps);
Preconditions.checkState(runningTxRunners.remove(this));
+ threadId = null;
}
}
@@ -523,13 +534,22 @@ public class FateExecutor<T> {
}
protected boolean flagStop() {
- return stop.compareAndSet(false, true);
+ boolean setStop = stop.compareAndSet(false, true);
+ if (setStop) {
+ runnerLog.trace("set stop for {}", threadId);
+ }
+ return setStop;
}
protected boolean isFlaggedToStop() {
return stop.get();
}
+ @Override
+ public String toString() {
+ return "threadId:" + threadId + " stop:" + stop.get();
+ }
+
}
protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
@@ -551,6 +571,7 @@ public class FateExecutor<T> {
@Override
public String toString() {
- return String.format("FateExecutor:{FateOps=%s,PoolSize:%s}", fateOps,
runningTxRunners.size());
+ return
String.format("FateExecutor:{FateOps=%s,PoolSize:%s,TransactionRunners:%s}",
fateOps,
+ runningTxRunners.size(), runningTxRunners);
}
}