This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new dd60da12b3 Small FATE improvements (#5813)
dd60da12b3 is described below

commit dd60da12b327caad99e0f00c1789a83576e0d0c5
Author: Kevin Rathbun <[email protected]>
AuthorDate: Fri Aug 22 12:17:30 2025 -0400

    Small FATE improvements (#5813)
    
    * Small FATE improvements
    
    partially addresses #5787
    
    * Improved how the FATE work finders are shutdown
    * Some improved FATE logs
    * Potential concurrency problem fix
---
 .../java/org/apache/accumulo/core/fate/Fate.java   |   4 +-
 .../apache/accumulo/core/fate/FateExecutor.java    | 173 ++++++++++-----------
 2 files changed, 88 insertions(+), 89 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index c86e8168e9..4149bc8539 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -334,7 +334,7 @@ public class Fate<T> {
     synchronized (fateExecutors) {
       for (var fateExecutor : fateExecutors) {
         if (fateExecutor.getFateOps().equals(fateOps)) {
-          return fateExecutor.getRunningTxRunners().size();
+          return fateExecutor.getNumRunningTxRunners();
         }
       }
     }
@@ -348,7 +348,7 @@ public class Fate<T> {
   @VisibleForTesting
   public int getTotalTxRunnersActive() {
     synchronized (fateExecutors) {
-      return fateExecutors.stream().mapToInt(fe -> 
fe.getRunningTxRunners().size()).sum();
+      return 
fateExecutors.stream().mapToInt(FateExecutor::getNumRunningTxRunners).sum();
     }
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index c986dfd84b..ddf3fc81c3 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.util.threads.Threads;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -105,96 +106,95 @@ public class FateExecutor<T> {
   protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer> 
poolConfigs,
       long idleCheckIntervalMillis) {
     final var pool = transactionExecutor;
-    final var runningTxRunners = getRunningTxRunners();
     final int configured = poolConfigs.get(fateOps);
     ThreadPools.resizePool(pool, () -> configured, poolName);
-    final int needed = configured - runningTxRunners.size();
-    if (needed > 0) {
-      // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
-      for (int i = 0; i < needed; i++) {
-        try {
-          pool.execute(new TransactionRunner());
-        } catch (RejectedExecutionException e) {
-          // RejectedExecutionException could be shutting down
-          if (pool.isShutdown()) {
-            // The exception is expected in this case, no need to spam the 
logs.
-            log.trace("Expected error adding transaction runner to FaTE 
executor pool. "
-                + "The pool is shutdown.", e);
-          } else {
-            // This is bad, FaTE may no longer work!
-            log.error("Unexpected error adding transaction runner to FaTE 
executor pool.", e);
+    synchronized (runningTxRunners) {
+      final int needed = configured - runningTxRunners.size();
+      if (needed > 0) {
+        // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
+        for (int i = 0; i < needed; i++) {
+          try {
+            pool.execute(new TransactionRunner());
+          } catch (RejectedExecutionException e) {
+            // RejectedExecutionException could be shutting down
+            if (pool.isShutdown()) {
+              // The exception is expected in this case, no need to spam the 
logs.
+              log.trace("Expected error adding transaction runner to FaTE 
executor pool. "
+                  + "The pool is shutdown.", e);
+            } else {
+              // This is bad, FaTE may no longer work!
+              log.error("Unexpected error adding transaction runner to FaTE 
executor pool.", e);
+            }
+            break;
           }
-          break;
-        }
-      }
-      idleCountHistory.clear();
-    } else if (needed < 0) {
-      // If we need the pool to shrink, then ensure excess TransactionRunners 
are safely
-      // stopped.
-      // Flag the necessary number of TransactionRunners to safely stop when 
they are done
-      // work on a transaction.
-      int numFlagged = (int) runningTxRunners.stream()
-          .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count();
-      int numToStop = -1 * (numFlagged + needed);
-      for (var runner : runningTxRunners) {
-        if (numToStop <= 0) {
-          break;
-        }
-        if (runner.flagStop()) {
-          log.trace("Flagging a TransactionRunner to stop...");
-          numToStop--;
         }
-      }
-    } else {
-      // The pool size did not change, but should it based on idle Fate 
threads? Maintain
-      // count of the last X minutes of idle Fate threads. If zero 95% of the 
time, then
-      // suggest that the pool size be increased or the fate ops assigned to 
that pool be
-      // split into separate pools.
-      final long interval = Math.min(60, 
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
-      var fateConfigProp = fate.getFateConfigProp();
-
-      if (interval == 0) {
         idleCountHistory.clear();
+      } else if (needed < 0) {
+        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely
+        // stopped.
+        // Flag the necessary number of TransactionRunners to safely stop when 
they are done
+        // work on a transaction.
+        int numFlagged = (int) runningTxRunners.stream()
+            .filter(FateExecutor.TransactionRunner::isFlaggedToStop).count();
+        int numToStop = -1 * (numFlagged + needed);
+        for (var runner : runningTxRunners) {
+          if (numToStop <= 0) {
+            break;
+          }
+          if (runner.flagStop()) {
+            log.trace("Flagging a TransactionRunner to stop...");
+            numToStop--;
+          }
+        }
       } else {
-        if (idleCountHistory.size() >= interval * 2) { // this task runs every 
30s
-          int zeroFateThreadsIdleCount = 0;
-          for (Integer idleConsumerCount : idleCountHistory) {
-            if (idleConsumerCount == 0) {
-              zeroFateThreadsIdleCount++;
+        // The pool size did not change, but should it based on idle Fate 
threads? Maintain
+        // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then
+        // suggest that the pool size be increased or the fate ops assigned to 
that pool be
+        // split into separate pools.
+        final long interval =
+            Math.min(60, 
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
+        var fateConfigProp = fate.getFateConfigProp();
+
+        if (interval == 0) {
+          idleCountHistory.clear();
+        } else {
+          if (idleCountHistory.size() >= interval * 2) { // this task runs 
every 30s
+            int zeroFateThreadsIdleCount = 0;
+            for (Integer idleConsumerCount : idleCountHistory) {
+              if (idleConsumerCount == 0) {
+                zeroFateThreadsIdleCount++;
+              }
             }
-          }
-          boolean needMoreThreads =
-              (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >= 
0.95;
-          if (needMoreThreads) {
-            fate.getNeedMoreThreadsWarnCount().incrementAndGet();
-            log.warn(
-                "All {} Fate threads working on the fate ops {} appear to be 
busy for "
-                    + "the last {} minutes. Consider increasing the value for 
the "
-                    + "entry in the property {} or splitting the fate ops 
across "
-                    + "multiple entries/pools.",
-                fate.getStore().type(), fateOps, interval, 
fateConfigProp.getKey());
-            // Clear the history so that we don't log for interval minutes.
-            idleCountHistory.clear();
-          } else {
-            while (idleCountHistory.size() >= interval * 2) {
-              idleCountHistory.remove();
+            boolean needMoreThreads =
+                (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) 
>= 0.95;
+            if (needMoreThreads) {
+              fate.getNeedMoreThreadsWarnCount().incrementAndGet();
+              log.warn(
+                  "All {} Fate threads working on the fate ops {} appear to be 
busy for "
+                      + "the last {} minutes. Consider increasing the value 
for the "
+                      + "entry in the property {} or splitting the fate ops 
across "
+                      + "multiple entries/pools.",
+                  fate.getStore().type(), fateOps, interval, 
fateConfigProp.getKey());
+              // Clear the history so that we don't log for interval minutes.
+              idleCountHistory.clear();
+            } else {
+              while (idleCountHistory.size() >= interval * 2) {
+                idleCountHistory.remove();
+              }
             }
           }
+          idleCountHistory.add(workQueue.getWaitingConsumerCount());
         }
-        idleCountHistory.add(workQueue.getWaitingConsumerCount());
       }
     }
   }
 
   /**
-   * @return an unmodifiable, shallow copy of the currently running 
transaction runners
+   * @return the number of currently running transaction runners
    */
-  protected Set<TransactionRunner> getRunningTxRunners() {
-    Set<TransactionRunner> copy;
-    synchronized (runningTxRunners) {
-      copy = new HashSet<>(runningTxRunners);
-    }
-    return Collections.unmodifiableSet(copy);
+  @VisibleForTesting
+  protected int getNumRunningTxRunners() {
+    return runningTxRunners.size();
   }
 
   protected Set<Fate.FateOperation> getFateOps() {
@@ -205,14 +205,14 @@ public class FateExecutor<T> {
    * Initiates the shutdown of this FateExecutor. This means the pool 
executing TransactionRunners
    * will no longer accept new TransactionRunners, the currently running 
TransactionRunners will
    * terminate after they are done with their current transaction, if 
applicable, and the work
-   * finder is interrupted. {@link #isShutdown()} returns true after this is 
called.
+   * finder is shutdown. {@link #isShutdown()} returns true after this is 
called.
    */
   protected void initiateShutdown() {
     transactionExecutor.shutdown();
     synchronized (runningTxRunners) {
       runningTxRunners.forEach(TransactionRunner::flagStop);
     }
-    workFinder.interrupt();
+    // work finder will terminate since this.isShutdown() is true
   }
 
   /**
@@ -233,8 +233,8 @@ public class FateExecutor<T> {
     if (timeout > 0) {
       while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && 
isAlive()) {
         if (!transactionExecutor.awaitTermination(1, SECONDS)) {
-          log.debug("Fate {} is waiting for worker threads for fate ops {} to 
terminate",
-              fate.getStore().type(), fateOps);
+          log.debug("Fate {} is waiting for {} worker threads for fate ops {} 
to terminate",
+              fate.getStore().type(), runningTxRunners.size(), fateOps);
           continue;
         }
 
@@ -306,18 +306,17 @@ public class FateExecutor<T> {
             }
           });
         } catch (Exception e) {
-          if (!fate.getKeepRunning().get() || isShutdown()) {
-            log.debug("Expected failure while attempting to find work for 
fate: either fate is "
-                + "being shutdown and therefore all fate threads are being 
shutdown or the "
-                + "fate threads assigned to work on {} were invalidated by 
config changes "
-                + "and are being shutdown", fateOps, e);
-          } else {
-            log.warn("Unexpected failure while attempting to find work for 
fate", e);
-          }
-
+          log.warn("Unexpected failure while attempting to find work for 
fate", e);
           workQueue.clear();
         }
       }
+
+      log.debug(
+          "FATE work finder for ops {} is gracefully exiting: either FATE is "
+              + "being shutdown ({}) and therefore all FATE threads are being 
shutdown or the "
+              + "FATE threads for the specific ops are being shutdown (due to 
FATE shutdown, "
+              + "or due to FATE config changes) ({})",
+          fateOps, !fate.getKeepRunning().get(), isShutdown());
     }
 
     private boolean txCancelledWhileNew(TStatus status, Fate.FateOperation 
fateOp) {

Reply via email to