This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 7c3a978 Use new ThreadPools method to clean up code (#2589) 7c3a978 is described below commit 7c3a9783ad01809f7c1bd5cf22471a1ffa421852 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Mar 25 07:35:47 2022 -0400 Use new ThreadPools method to clean up code (#2589) --- .../apache/accumulo/server/rpc/TServerUtils.java | 29 ++++++++++------------ .../accumulo/server/util/FileSystemMonitor.java | 26 +++++++++---------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index b176175..ba8ba2d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -314,22 +314,19 @@ public class TServerUtils { final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool( executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true); // periodically adjust the number of threads we need by checking how busy our threads are - ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools() - .createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> { - // there is a minor race condition between sampling the current state of the thread pool - // and - // adjusting it - // however, this isn't really an issue, since it adjusts periodically anyway - if (pool.getCorePoolSize() <= pool.getActiveCount()) { - int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); - ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); - } else { - if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { - int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); - ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); - } - } - }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS)); + ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> { + // there is a minor race condition between sampling the current state of the thread pool + // and adjusting it however, this isn't really an issue, since it adjusts periodically + if (pool.getCorePoolSize() <= pool.getActiveCount()) { + int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); + ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); + } else { + if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { + int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); + ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); + } + } + }); return pool; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java index 255f453..1bcc77c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -117,20 +116,19 @@ public class FileSystemMonitor { // Create a task to check each mount periodically to see if its state has changed. for (Mount mount : mounts) { - ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools() - .createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay( - Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> { - try { - checkMount(mount); - } catch (final Exception e) { - Halt.halt(-42, new Runnable() { - @Override - public void run() { - log.error("Exception while checking mount points, halting process", e); - } - }); + ThreadPools.watchCriticalFixedDelay(conf, period, + Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> { + try { + checkMount(mount); + } catch (final Exception e) { + Halt.halt(-42, new Runnable() { + @Override + public void run() { + log.error("Exception while checking mount points, halting process", e); } - }), period, period, TimeUnit.MILLISECONDS)); + }); + } + })); } }