This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new e69d8b39dd Recommend property change when Fate threads are mostly idle (#5129) e69d8b39dd is described below commit e69d8b39dd4ecd8bf355e315e954d8db9059b54d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Dec 6 16:01:31 2024 -0500 Recommend property change when Fate threads are mostly idle (#5129) Added new property that controls the checking of the number of idle Fate threads in the background thread that resizes the Fate thread pool. This new logic will print a warning in the log that the Fate thread pool size property should be increased if the number of idle Fate threads is zero 95% of the time. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../org/apache/accumulo/core/conf/Property.java | 6 ++++ .../java/org/apache/accumulo/core/fate/Fate.java | 41 +++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 98412a97d7..b9b7f72181 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -425,6 +425,12 @@ public enum Property { "The number of threads used to run fault-tolerant executions (FATE)." + " These are primarily table operations like merge.", "1.4.3"), + MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m", + PropertyType.TIMEDURATION, + "The interval at which to check if the number of idle Fate threads has consistently been zero." + + " The way this is checked is an approximation. Logs a warning in the Manager log to increase" + + " MANAGER_FATE_THREADPOOL_SIZE. A value of zero disables this check and has a maximum value of 60m.", + "4.0.0"), MANAGER_STATUS_THREAD_POOL_SIZE("manager.status.threadpool.size", "0", PropertyType.COUNT, "The number of threads to use when fetching the tablet server status for balancing. Zero " + "indicates an unlimited number of threads will be used.", diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1350cce652..f46cc1aa43 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -34,6 +34,7 @@ import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; import java.time.Duration; import java.util.EnumSet; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionException; @@ -83,6 +84,7 @@ public class Fate<T> { private final AtomicBoolean keepRunning = new AtomicBoolean(true); private final TransferQueue<FateId> workQueue; private final Thread workFinder; + private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>(); public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE @@ -355,7 +357,8 @@ public class Fate<T> { // resize the pool if the property changed ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); // If the pool grew, then ensure that there is a TransactionRunner for each thread - int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getQueue().size(); + final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE); + final int needed = configured - pool.getQueue().size(); if (needed > 0) { for (int i = 0; i < needed; i++) { try { @@ -372,6 +375,41 @@ public class Fate<T> { break; } } + idleCountHistory.clear(); + } else { + // The property did not change, but should it based on idle Fate threads? Maintain + // count of the last X minutes of idle Fate threads. If zero 95% of the time, then suggest + // that the + // MANAGER_FATE_THREADPOOL_SIZE be increased. + final long interval = Math.min(60, TimeUnit.MILLISECONDS + .toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL))); + if (interval == 0) { + idleCountHistory.clear(); + } else { + if (idleCountHistory.size() >= interval * 2) { // this task runs every 30s + int zeroFateThreadsIdleCount = 0; + for (Integer idleConsumerCount : idleCountHistory) { + if (idleConsumerCount == 0) { + zeroFateThreadsIdleCount++; + } + } + boolean needMoreThreads = + (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >= 0.95; + if (needMoreThreads) { + log.warn( + "All Fate threads appear to be busy for the last {} minutes," + + " consider increasing property: {}", + interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey()); + // Clear the history so that we don't log for interval minutes. + idleCountHistory.clear(); + } else { + while (idleCountHistory.size() >= interval * 2) { + idleCountHistory.remove(); + } + } + } + idleCountHistory.add(workQueue.getWaitingConsumerCount()); + } } }, 3, 30, SECONDS)); this.transactionExecutor = pool; @@ -611,5 +649,6 @@ public class Fate<T> { if (deadResCleanerExecutor != null) { deadResCleanerExecutor.shutdownNow(); } + idleCountHistory.clear(); } }