This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new e69d8b39dd Recommend property change when Fate threads are mostly idle 
 (#5129)
e69d8b39dd is described below

commit e69d8b39dd4ecd8bf355e315e954d8db9059b54d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Dec 6 16:01:31 2024 -0500

    Recommend property change when Fate threads are mostly idle  (#5129)
    
    Added new property that controls the checking of the number
    of idle Fate threads in the background thread that resizes the
    Fate thread pool. This new logic will print a warning in the log
    that the Fate thread pool size property should be increased if
    the number of idle Fate threads is zero 95% of the time.
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../org/apache/accumulo/core/conf/Property.java    |  6 ++++
 .../java/org/apache/accumulo/core/fate/Fate.java   | 41 +++++++++++++++++++++-
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 98412a97d7..b9b7f72181 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -425,6 +425,12 @@ public enum Property {
       "The number of threads used to run fault-tolerant executions (FATE)."
           + " These are primarily table operations like merge.",
       "1.4.3"),
+  MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
+      PropertyType.TIMEDURATION,
+      "The interval at which to check if the number of idle Fate threads has 
consistently been zero."
+          + " The way this is checked is an approximation. Logs a warning in 
the Manager log to increase"
+          + " MANAGER_FATE_THREADPOOL_SIZE. A value of zero disables this 
check and has a maximum value of 60m.",
+      "4.0.0"),
   MANAGER_STATUS_THREAD_POOL_SIZE("manager.status.threadpool.size", "0", 
PropertyType.COUNT,
       "The number of threads to use when fetching the tablet server status for 
balancing.  Zero "
           + "indicates an unlimited number of threads will be used.",
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 1350cce652..f46cc1aa43 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -34,6 +34,7 @@ import static 
org.apache.accumulo.core.util.ShutdownUtil.isIOException;
 import java.time.Duration;
 import java.util.EnumSet;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -83,6 +84,7 @@ public class Fate<T> {
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
   private final TransferQueue<FateId> workQueue;
   private final Thread workFinder;
+  private final ConcurrentLinkedQueue<Integer> idleCountHistory = new 
ConcurrentLinkedQueue<>();
 
   public enum TxInfo {
     TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -355,7 +357,8 @@ public class Fate<T> {
       // resize the pool if the property changed
       ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
       // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
-      int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - 
pool.getQueue().size();
+      final int configured = 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
+      final int needed = configured - pool.getQueue().size();
       if (needed > 0) {
         for (int i = 0; i < needed; i++) {
           try {
@@ -372,6 +375,41 @@ public class Fate<T> {
             break;
           }
         }
+        idleCountHistory.clear();
+      } else {
+        // The property did not change, but should it based on idle Fate 
threads? Maintain
+        // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then suggest
+        // that the
+        // MANAGER_FATE_THREADPOOL_SIZE be increased.
+        final long interval = Math.min(60, TimeUnit.MILLISECONDS
+            
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
+        if (interval == 0) {
+          idleCountHistory.clear();
+        } else {
+          if (idleCountHistory.size() >= interval * 2) { // this task runs 
every 30s
+            int zeroFateThreadsIdleCount = 0;
+            for (Integer idleConsumerCount : idleCountHistory) {
+              if (idleConsumerCount == 0) {
+                zeroFateThreadsIdleCount++;
+              }
+            }
+            boolean needMoreThreads =
+                (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) 
>= 0.95;
+            if (needMoreThreads) {
+              log.warn(
+                  "All Fate threads appear to be busy for the last {} minutes,"
+                      + " consider increasing property: {}",
+                  interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
+              // Clear the history so that we don't log for interval minutes.
+              idleCountHistory.clear();
+            } else {
+              while (idleCountHistory.size() >= interval * 2) {
+                idleCountHistory.remove();
+              }
+            }
+          }
+          idleCountHistory.add(workQueue.getWaitingConsumerCount());
+        }
       }
     }, 3, 30, SECONDS));
     this.transactionExecutor = pool;
@@ -611,5 +649,6 @@ public class Fate<T> {
     if (deadResCleanerExecutor != null) {
       deadResCleanerExecutor.shutdownNow();
     }
+    idleCountHistory.clear();
   }
 }

Reply via email to