This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ec8ae122ed Use fluent-style builder for pool creation, replacing 
overloaded methods (#4384)
ec8ae122ed is described below

commit ec8ae122edfd5c25601694d107f18af014e28383
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Tue Mar 19 13:31:38 2024 -0400

    Use fluent-style builder for pool creation, replacing overloaded methods 
(#4384)
    
    * Use fluent-style builder for pool creation, replacing overloaded methods
    
    * Replaces overloaded createThreadPool methods with a fluent-style builder 
(ThreadPoolExecutorBuilderTest).
    * Adds ThreadPoolExecutorBuilderTest test,
    * Adds `createExecutorService` method that does not have an option to enable
    metrics and replaces all occurrences in the code that was calling the 
alternate
    method with emitThreadPoolMetrics=false to use it.
    
    The `createScheduledExecutorService` will be refactored in a future PR when 
service
    initialization is reworked.  See PR #4342 for an example.
    
    This change isolates where metrics can be enabled and makes finding them
    easier.  It also will make reviewing future changes easier because those
    changes will be isolated to places that currently enable thread pool
    metrics.
---
 .../accumulo/core/clientImpl/ClientContext.java    |   9 +-
 .../core/clientImpl/ConditionalWriterImpl.java     |   2 +-
 .../core/clientImpl/InstanceOperationsImpl.java    |   4 +-
 .../core/clientImpl/TableOperationsImpl.java       |   3 +-
 .../core/clientImpl/TabletServerBatchReader.java   |   5 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |   8 +-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |   8 +-
 .../accumulo/core/file/BloomFilterLayer.java       |   5 +-
 .../file/blockfile/cache/lru/LruBlockCache.java    |   4 +-
 .../blockfile/cache/tinylfu/TinyLfuBlockCache.java |   2 +-
 .../util/compaction/ExternalCompactionUtil.java    |  10 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 419 +++++++++++++--------
 .../core/file/rfile/MultiThreadedRFileTest.java    |   7 +-
 .../threads/ThreadPoolExecutorBuilderTest.java     |  79 ++++
 .../accumulo/server/client/BulkImporter.java       |  10 +-
 .../server/conf/ServerConfigurationFactory.java    |   4 +-
 .../conf/store/impl/PropCacheCaffeineImpl.java     |   7 +-
 .../server/conf/store/impl/PropStoreWatcher.java   |   5 +-
 .../accumulo/server/fs/VolumeManagerImpl.java      |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   7 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   |   7 +-
 .../server/util/RemoveEntriesForMissingFiles.java  |   4 +-
 .../server/util/VerifyTabletAssignments.java       |   5 +-
 .../server/conf/store/impl/ReadyMonitorTest.java   |   4 +-
 .../coordinator/CompactionCoordinator.java         |   4 +-
 .../accumulo/coordinator/CompactionFinalizer.java  |  11 +-
 .../main/java/org/apache/accumulo/gc/GCRun.java    |   2 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   2 +-
 .../manager/metrics/ReplicationMetrics.java        |   2 +-
 .../accumulo/manager/metrics/fate/FateMetrics.java |   4 +-
 .../accumulo/manager/recovery/RecoveryManager.java |   4 +-
 .../manager/tableOps/bulkVer1/BulkImport.java      |   6 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   9 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |   2 +-
 .../tserver/TabletServerResourceManager.java       |  31 +-
 .../tserver/compactions/CompactionService.java     |   6 +-
 .../compactions/InternalCompactionExecutor.java    |   9 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |   5 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   4 +-
 .../metrics/CompactionExecutorsMetrics.java        |   2 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |   5 +-
 .../test/functional/BatchWriterFlushIT.java        |   2 +-
 42 files changed, 465 insertions(+), 267 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 8f7993812a..d02a2c743e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -260,8 +260,9 @@ public class ClientContext implements AccumuloClient {
       submitScannerReadAheadTask(Callable<List<KeyValue>> c) {
     ensureOpen();
     if (scannerReadaheadPool == null) {
-      scannerReadaheadPool = clientThreadPools.createThreadPool(0, 
Integer.MAX_VALUE, 3L, SECONDS,
-          "Accumulo scanner read ahead thread", new SynchronousQueue<>(), 
true);
+      scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo 
scanner read ahead thread")
+          .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, 
SECONDS)
+          .withQueue(new 
SynchronousQueue<>()).enableThreadPoolMetrics().build();
     }
     return scannerReadaheadPool.submit(c);
   }
@@ -269,8 +270,8 @@ public class ClientContext implements AccumuloClient {
   public synchronized void executeCleanupTask(Runnable r) {
     ensureOpen();
     if (cleanupThreadPool == null) {
-      cleanupThreadPool = clientThreadPools.createFixedThreadPool(1, 3, 
SECONDS,
-          "Conditional Writer Cleanup Thread", true);
+      cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer 
Cleanup Thread")
+          .numCoreThreads(1).withTimeOut(3L, 
SECONDS).enableThreadPoolMetrics().build();
     }
     this.cleanupThreadPool.execute(r);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 3d6f1f2cf2..e9802300a3 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -371,7 +371,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.auths = config.getAuthorizations();
     this.ve = new VisibilityEvaluator(config.getAuthorizations());
     this.threadPool = context.threadPools().createScheduledExecutorService(
-        config.getMaxWriteThreads(), this.getClass().getSimpleName(), false);
+        config.getMaxWriteThreads(), this.getClass().getSimpleName());
     this.locator = new SyncingTabletLocator(context, tableId);
     this.serverQueues = new HashMap<>();
     this.tableId = tableId;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 37229b88c0..084d59ef11 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -301,8 +301,8 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
     List<String> tservers = getTabletServers();
 
     int numThreads = Math.max(4, Math.min((tservers.size() + 
compactors.size()) / 10, 256));
-    var executorService =
-        context.threadPools().createFixedThreadPool(numThreads, 
"getactivecompactions", false);
+    var executorService = 
context.threadPools().getPoolBuilder("getactivecompactions")
+        .numCoreThreads(numThreads).build();
     try {
       List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 16f1d92284..129cb6a681 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -495,7 +495,8 @@ public class TableOperationsImpl extends 
TableOperationsHelper {
     CountDownLatch latch = new CountDownLatch(splits.size());
     AtomicReference<Exception> exception = new AtomicReference<>(null);
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, 
"addSplits", false);
+    ExecutorService executor =
+        
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
     try {
       executor.execute(
           new SplitTask(new SplitEnv(tableName, tableId, executor, latch, 
exception), splits));
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index 5c2f6229e8..23f40e9be3 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -71,8 +71,9 @@ public class TabletServerBatchReader extends ScannerOptions 
implements BatchScan
     this.tableName = tableName;
     this.numThreads = numQueryThreads;
 
-    queryThreadPool = 
context.threadPools().createFixedThreadPool(numQueryThreads,
-        "batch scanner " + batchReaderInstance + "-", false);
+    queryThreadPool =
+        context.threadPools().getPoolBuilder("batch scanner " + 
batchReaderInstance + "-")
+            .numCoreThreads(numQueryThreads).build();
     // Call shutdown on this thread pool in case the caller does not call 
close().
     cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, 
closed, log);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index c4461a3d71..980ba0408a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -672,11 +672,11 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<>();
       queued = new HashSet<>();
-      sendThreadPool = 
context.threadPools().createFixedThreadPool(numSendThreads,
-          this.getClass().getName(), false);
+      sendThreadPool = 
context.threadPools().getPoolBuilder(this.getClass().getName())
+          .numCoreThreads(numSendThreads).build();
       locators = new HashMap<>();
-      binningThreadPool = context.threadPools().createFixedThreadPool(1, 
"BinMutations",
-          new SynchronousQueue<>(), false);
+      binningThreadPool = 
context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1)
+          .withQueue(new SynchronousQueue<>()).build();
       binningThreadPool.setRejectedExecutionHandler(new 
ThreadPoolExecutor.CallerRunsPolicy());
     }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index 4810048e93..f13420d006 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -482,12 +482,12 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     if (this.executor != null) {
       executor = this.executor;
     } else if (numThreads > 0) {
-      executor = service =
-          context.threadPools().createFixedThreadPool(numThreads, 
"BulkImportThread", false);
+      executor = service = 
context.threadPools().getPoolBuilder("BulkImportThread")
+          .numCoreThreads(numThreads).build();
     } else {
       String threads = 
context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
-      executor = service = context.threadPools().createFixedThreadPool(
-          ConfigurationTypeHelper.getNumThreads(threads), "BulkImportThread", 
false);
+      executor = service = 
context.threadPools().getPoolBuilder("BulkImportThread")
+          
.numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build();
     }
 
     try {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java 
b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 39cd7d2d13..318f87a88e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -79,10 +79,9 @@ public class BloomFilterLayer {
     }
 
     if (maxLoadThreads > 0) {
-      loadThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, 
maxLoadThreads, 60,
-          SECONDS, "bloom-loader", false);
+      loadThreadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader")
+          .numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, 
SECONDS).build();
     }
-
     return loadThreadPool;
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
index 01972f8ffe..0183ebe3bb 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -102,8 +102,8 @@ public class LruBlockCache extends 
SynchronousLoadingBlockCache implements Block
   private final EvictionThread evictionThread;
 
   /** Statistics thread schedule pool (for heavy debugging, could remove) */
-  private final ScheduledExecutorService scheduleThreadPool = 
ThreadPools.getServerThreadPools()
-      .createScheduledExecutorService(1, "LRUBlockCacheStats", false);
+  private final ScheduledExecutorService scheduleThreadPool =
+      ThreadPools.getServerThreadPools().createScheduledExecutorService(1, 
"LRUBlockCacheStats");
 
   /** Current size of cache */
   private final AtomicLong size;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 04cab1a38c..46a07682bd 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -62,7 +62,7 @@ public final class TinyLfuBlockCache implements BlockCache {
   private final Policy.Eviction<String,Block> policy;
   private final int maxSize;
   private final ScheduledExecutorService statsExecutor = 
ThreadPools.getServerThreadPools()
-      .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", 
false);
+      .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor");
 
   public TinyLfuBlockCache(Configuration conf, CacheType type) {
     cache = Caffeine.newBuilder()
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 5d0107bfb0..a974836ab0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -222,9 +222,8 @@ public class ExternalCompactionUtil {
    */
   public static List<RunningCompaction> 
getCompactionsRunningOnCompactors(ClientContext context) {
     final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
-    final ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(16,
-        "CompactorRunningCompactions", false);
-
+    final ExecutorService executor = ThreadPools.getServerThreadPools()
+        
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
     getCompactorAddrs(context).forEach((q, hp) -> {
       hp.forEach(hostAndPort -> {
         rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
@@ -250,9 +249,8 @@ public class ExternalCompactionUtil {
 
   public static Collection<ExternalCompactionId>
       getCompactionIdsRunningOnCompactors(ClientContext context) {
-    final ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(16,
-        "CompactorRunningCompactions", false);
-
+    final ExecutorService executor = ThreadPools.getServerThreadPools()
+        
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
     List<Future<ExternalCompactionId>> futures = new ArrayList<>();
 
     getCompactorAddrs(context).forEach((q, hp) -> {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 7206118ce4..147f19e5dd 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.util.threads;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -43,9 +44,12 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 @SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
@@ -64,7 +68,7 @@ public class ThreadPools {
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
-  public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
+  public static final long DEFAULT_TIMEOUT_MILLISECS = MINUTES.toMillis(3);
 
   private static final ThreadPools SERVER_INSTANCE = new 
ThreadPools(Threads.UEH);
 
@@ -77,7 +81,7 @@ public class ThreadPools {
   }
 
   private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
-      getServerThreadPools().createFixedThreadPool(1, "Scheduled Future 
Checker", false);
+      getServerThreadPools().getPoolBuilder("Scheduled Future 
Checker").numCoreThreads(1).build();
 
   private static final ConcurrentLinkedQueue<ScheduledFuture<?>> 
CRITICAL_RUNNING_TASKS =
       new ConcurrentLinkedQueue<>();
@@ -85,7 +89,7 @@ public class ThreadPools {
   private static final ConcurrentLinkedQueue<ScheduledFuture<?>> 
NON_CRITICAL_RUNNING_TASKS =
       new ConcurrentLinkedQueue<>();
 
-  private static Runnable TASK_CHECKER = new Runnable() {
+  private static final Runnable TASK_CHECKER = new Runnable() {
     @Override
     public void run() {
       final List<ConcurrentLinkedQueue<ScheduledFuture<?>>> queues =
@@ -100,7 +104,7 @@ public class ThreadPools {
           }
         });
         try {
-          TimeUnit.MINUTES.sleep(1);
+          MINUTES.sleep(1);
         } catch (InterruptedException ie) {
           // This thread was interrupted by something while sleeping. We don't 
want to exit
           // this thread, so reset the interrupt state on this thread and keep 
going.
@@ -232,6 +236,20 @@ public class ThreadPools {
     handler = ueh;
   }
 
+  /**
+   * Create a thread pool based on a thread pool related property. The pool 
will not be instrumented
+   * without additional metrics. This method should be preferred, especially 
for short-lived pools.
+   *
+   * @param conf accumulo configuration
+   * @param p thread pool related property
+   * @return ExecutorService impl
+   * @throws IllegalArgumentException if property is not handled
+   */
+  public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration 
conf,
+      final Property p) {
+    return createExecutorService(conf, p, false);
+  }
+
   /**
    * Create a thread pool based on a thread pool related property
    *
@@ -239,173 +257,226 @@ public class ThreadPools {
    * @param p thread pool related property
    * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
    *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
+   *        related to it in the static registry. There is no way to clean 
these leftover objects up
+   *        therefore its recommended that this option only be set true for 
long-lived thread pools.
+   *        Creating lots of short-lived thread pools and registering them can 
lead to out of memory
+   *        errors over long time periods.
    * @return ExecutorService impl
-   * @throws RuntimeException if property is not handled
+   * @throws IllegalArgumentException if property is not handled
    */
   @SuppressWarnings("deprecation")
   public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration 
conf,
       final Property p, boolean emitThreadPoolMetrics) {
-
+    ThreadPoolExecutorBuilder builder;
     switch (p) {
       case GENERAL_SIMPLETIMER_THREADPOOL_SIZE:
-        return createScheduledExecutorService(conf.getCount(p), "SimpleTimer",
-            emitThreadPoolMetrics);
+        return createScheduledExecutorService(conf.getCount(p), "SimpleTimer");
       case GENERAL_THREADPOOL_SIZE:
         return createScheduledExecutorService(conf.getCount(p), 
"GeneralExecutor",
             emitThreadPoolMetrics);
       case MANAGER_BULK_THREADPOOL_SIZE:
-        return createFixedThreadPool(conf.getCount(p),
-            conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), 
MILLISECONDS,
-            "bulk import", emitThreadPoolMetrics);
+        builder = getPoolBuilder("bulk 
import").numCoreThreads(conf.getCount(p)).withTimeOut(
+            conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), 
MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case MANAGER_RENAME_THREADS:
-        return createFixedThreadPool(conf.getCount(p), "bulk move", 
emitThreadPoolMetrics);
+        builder = getPoolBuilder("bulk move").numCoreThreads(conf.getCount(p));
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case MANAGER_FATE_THREADPOOL_SIZE:
-        return createFixedThreadPool(conf.getCount(p), "Repo Runner", 
emitThreadPoolMetrics);
+        builder = getPoolBuilder("Repo 
Runner").numCoreThreads(conf.getCount(p));
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case MANAGER_STATUS_THREAD_POOL_SIZE:
+        builder = getPoolBuilder("GatherTableInformation");
         int threads = conf.getCount(p);
         if (threads == 0) {
-          return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, 
"GatherTableInformation",
-              new SynchronousQueue<>(), emitThreadPoolMetrics);
+          
builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, 
SECONDS)
+              .withQueue(new SynchronousQueue<>());
         } else {
-          return createFixedThreadPool(threads, "GatherTableInformation", 
emitThreadPoolMetrics);
+          builder.numCoreThreads(threads);
+        }
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
         }
+        return builder.build();
       case TSERV_WORKQ_THREADS:
-        return createFixedThreadPool(conf.getCount(p), "distributed work 
queue",
-            emitThreadPoolMetrics);
+        builder = getPoolBuilder("distributed work 
queue").numCoreThreads(conf.getCount(p));
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_MINC_MAXCONCURRENT:
-        return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"minor compactor",
-            emitThreadPoolMetrics);
+        builder = getPoolBuilder("minor 
compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L,
+            MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_MIGRATE_MAXCONCURRENT:
-        return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet migration",
-            emitThreadPoolMetrics);
+        builder = getPoolBuilder("tablet 
migration").numCoreThreads(conf.getCount(p))
+            .withTimeOut(0L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_ASSIGNMENT_MAXCONCURRENT:
-        return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet assignment",
-            emitThreadPoolMetrics);
+        builder = getPoolBuilder("tablet 
assignment").numCoreThreads(conf.getCount(p))
+            .withTimeOut(0L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_SUMMARY_RETRIEVAL_THREADS:
-        return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-            "summary file retriever", emitThreadPoolMetrics);
+        builder = getPoolBuilder("summary file 
retriever").numCoreThreads(conf.getCount(p))
+            .withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_SUMMARY_REMOTE_THREADS:
-        return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS, "summary remote",
-            emitThreadPoolMetrics);
+        builder = getPoolBuilder("summary 
remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L,
+            MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case TSERV_SUMMARY_PARTITION_THREADS:
-        return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-            "summary partition", emitThreadPoolMetrics);
+        builder = getPoolBuilder("summary 
partition").numCoreThreads(conf.getCount(p))
+            .withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case GC_DELETE_THREADS:
-        return createFixedThreadPool(conf.getCount(p), "deleting", 
emitThreadPoolMetrics);
+        return 
getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build();
       case REPLICATION_WORKER_THREADS:
-        return createFixedThreadPool(conf.getCount(p), "replication task", 
emitThreadPoolMetrics);
+        builder = getPoolBuilder("replication 
task").numCoreThreads(conf.getCount(p));
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
+
       default:
-        throw new RuntimeException("Unhandled thread pool property: " + p);
+        throw new IllegalArgumentException("Unhandled thread pool property: " 
+ p);
     }
   }
 
-  /**
-   * Create a named thread pool
-   *
-   * @param numThreads number of threads
-   * @param name thread pool name
-   * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
-   *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
-   * @return ThreadPoolExecutor
-   */
-  public ThreadPoolExecutor createFixedThreadPool(int numThreads, final String 
name,
-      boolean emitThreadPoolMetrics) {
-    return createFixedThreadPool(numThreads, DEFAULT_TIMEOUT_MILLISECS, 
MILLISECONDS, name,
-        emitThreadPoolMetrics);
+  public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) {
+    return new ThreadPoolExecutorBuilder(name);
   }
 
-  /**
-   * Create a named thread pool
-   *
-   * @param numThreads number of threads
-   * @param name thread pool name
-   * @param queue queue to use for tasks
-   * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
-   *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
-   * @return ThreadPoolExecutor
-   */
-  public ThreadPoolExecutor createFixedThreadPool(int numThreads, final String 
name,
-      BlockingQueue<Runnable> queue, boolean emitThreadPoolMetrics) {
-    return createThreadPool(numThreads, numThreads, DEFAULT_TIMEOUT_MILLISECS, 
MILLISECONDS, name,
-        queue, emitThreadPoolMetrics);
-  }
+  public class ThreadPoolExecutorBuilder {
+    final String name;
+    int coreThreads = 0;
+    int maxThreads = -1;
+    long timeOut = DEFAULT_TIMEOUT_MILLISECS;
+    TimeUnit units = MILLISECONDS;
+    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    OptionalInt priority = OptionalInt.empty();
+    boolean emitThreadPoolMetrics = false;
+
+    /**
+     * A fluent-style build to create a ThreadPoolExecutor. The name is used 
when creating
+     * named-threads for the pool.
+     */
+    ThreadPoolExecutorBuilder(@NonNull final String name) {
+      this.name = name;
+    }
 
-  /**
-   * Create a named thread pool
-   *
-   * @param numThreads number of threads
-   * @param timeOut core thread time out
-   * @param units core thread time out units
-   * @param name thread pool name
-   * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
-   *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
-   * @return ThreadPoolExecutor
-   */
-  public ThreadPoolExecutor createFixedThreadPool(int numThreads, long 
timeOut, TimeUnit units,
-      final String name, boolean emitThreadPoolMetrics) {
-    return createThreadPool(numThreads, numThreads, timeOut, units, name, 
emitThreadPoolMetrics);
-  }
+    public ThreadPoolExecutor build() {
+      Preconditions.checkArgument(coreThreads >= 0,
+          "The number of core threads must be 0 or larger");
+      if (maxThreads < 0) {
+        // create a fixed pool with maxThread = coreThreads if core threads 
set.
+        maxThreads = coreThreads == 0 ? 1 : coreThreads;
+      }
+      Preconditions.checkArgument(maxThreads >= coreThreads,
+          "The number of max threads must be greater than 0 and greater than 
or equal to the number of core threads");
+      Preconditions.checkArgument(
+          priority.orElse(1) >= Thread.MIN_PRIORITY && priority.orElse(1) <= 
Thread.MAX_PRIORITY,
+          "invalid thread priority, range must be Thread.MIN_PRIORITY <= 
priority <= Thread.MAX_PRIORITY");
+
+      return createThreadPool(coreThreads, maxThreads, timeOut, units, name, 
queue, priority,
+          emitThreadPoolMetrics);
+    }
 
-  /**
-   * Create a named thread pool
-   *
-   * @param coreThreads number of threads
-   * @param maxThreads max number of threads
-   * @param timeOut core thread time out
-   * @param units core thread time out units
-   * @param name thread pool name
-   * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
-   *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
-   * @return ThreadPoolExecutor
-   */
-  public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, 
long timeOut,
-      TimeUnit units, final String name, boolean emitThreadPoolMetrics) {
-    return createThreadPool(coreThreads, maxThreads, timeOut, units, name,
-        new LinkedBlockingQueue<>(), emitThreadPoolMetrics);
-  }
+    /**
+     * Set the number of coreThreads. See {@link 
java.util.concurrent.ThreadPoolExecutor}
+     *
+     * @param coreThreads the number of core thread, must be 0 or larger.
+     * @return fluent-style builder instance
+     */
+    public ThreadPoolExecutorBuilder numCoreThreads(int coreThreads) {
+      this.coreThreads = coreThreads;
+      return this;
+    }
 
-  /**
-   * Create a named thread pool
-   *
-   * @param coreThreads number of threads
-   * @param maxThreads max number of threads
-   * @param timeOut core thread time out
-   * @param units core thread time out units
-   * @param name thread pool name
-   * @param queue queue to use for tasks
-   * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
-   *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
-   * @return ThreadPoolExecutor
-   */
-  public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, 
long timeOut,
-      TimeUnit units, final String name, BlockingQueue<Runnable> queue,
-      boolean emitThreadPoolMetrics) {
-    return createThreadPool(coreThreads, maxThreads, timeOut, units, name, 
queue,
-        OptionalInt.empty(), emitThreadPoolMetrics);
+    /**
+     * Set the maximum number of threads in the pool. See
+     * {@link java.util.concurrent.ThreadPoolExecutor}. If the maxThreads is 
not set, defaults to
+     * the number of core threads (if set) resulting in a fixed pool. If the 
number of core threads
+     * is not set, defaults to a single thread.
+     *
+     * @param maxThreads max number of threads. Must be greater than 0 and 
equal or greater that the
+     *        number of core threads.
+     *
+     * @return fluent-style builder instance
+     */
+    public ThreadPoolExecutorBuilder numMaxThreads(int maxThreads) {
+      this.maxThreads = maxThreads;
+      return this;
+    }
+
+    /**
+     * Set the thread keep-alive time. See {@link 
java.util.concurrent.ThreadPoolExecutor}
+     *
+     * @param timeOut the thread keep alive time.
+     * @param units the keep alive time units.
+     * @return fluent-style builder instance
+     */
+    public ThreadPoolExecutorBuilder withTimeOut(long timeOut, @NonNull 
TimeUnit units) {
+      this.timeOut = timeOut;
+      this.units = units;
+      return this;
+    }
+
+    /**
+     * Set the queue that will hold runnable tasks before execution. See
+     * {@link java.util.concurrent.ThreadPoolExecutor}
+     *
+     * @param queue the work queue used to hold tasks before they are executed.
+     * @return fluent-style builder instance
+     */
+    public ThreadPoolExecutorBuilder withQueue(@NonNull final 
BlockingQueue<Runnable> queue) {
+      this.queue = queue;
+      return this;
+    }
+
+    public ThreadPoolExecutorBuilder atPriority(@NonNull final OptionalInt 
priority) {
+      this.priority = priority;
+      return this;
+    }
+
+    /**
+     * When set to true will emit metrics and register the metrics in a static 
registry. After the
+     * thread pool is deleted, there will still be metrics objects related to 
it in the static
+     * registry. There is no way to clean these leftover objects up therefore 
its recommended that
+     * this option only be set true for long-lived thread pools. Creating lots 
of short-lived thread
+     * pools and registering them can lead to out of memory errors over long 
time periods.
+     *
+     * @return a fluent-style builder instance
+     */
+    public ThreadPoolExecutorBuilder enableThreadPoolMetrics() {
+      this.emitThreadPoolMetrics = true;
+      return this;
+    }
   }
 
   /**
@@ -420,15 +491,16 @@ public class ThreadPools {
    * @param priority thread priority
    * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
    *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
+   *        related to it in the static registry. There is no way to clean 
these leftover objects up
+   *        therefore its recommended that this option only be set true for 
long-lived thread pools.
+   *        Creating lots of short-lived thread pools and registering them can 
lead to out of memory
+   *        errors over long time periods.
    * @return ThreadPoolExecutor
    */
-  public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, 
long timeOut,
-      TimeUnit units, final String name, BlockingQueue<Runnable> queue, 
OptionalInt priority,
-      boolean emitThreadPoolMetrics) {
+  private ThreadPoolExecutor createThreadPool(final int coreThreads, final int 
maxThreads,
+      final long timeOut, final TimeUnit units, final String name,
+      final BlockingQueue<Runnable> queue, final OptionalInt priority,
+      final boolean emitThreadPoolMetrics) {
     LOG.trace(
         "Creating ThreadPoolExecutor for {} with {} core threads and {} max 
threads {} {} timeout",
         name, coreThreads, maxThreads, timeOut, units);
@@ -436,7 +508,7 @@ public class ThreadPools {
         new NamedThreadFactory(name, priority, handler)) {
 
       @Override
-      public void execute(Runnable arg0) {
+      public void execute(@NonNull Runnable arg0) {
         super.execute(TraceUtil.wrap(arg0));
       }
 
@@ -446,17 +518,20 @@ public class ThreadPools {
       }
 
       @Override
-      public <T> Future<T> submit(Callable<T> task) {
+      @NonNull
+      public <T> Future<T> submit(@NonNull Callable<T> task) {
         return super.submit(TraceUtil.wrap(task));
       }
 
       @Override
-      public <T> Future<T> submit(Runnable task, T result) {
+      @NonNull
+      public <T> Future<T> submit(@NonNull Runnable task, T result) {
         return super.submit(TraceUtil.wrap(task), result);
       }
 
       @Override
-      public Future<?> submit(Runnable task) {
+      @NonNull
+      public Future<?> submit(@NonNull Runnable task) {
         return super.submit(TraceUtil.wrap(task));
       }
     };
@@ -481,6 +556,19 @@ public class ThreadPools {
     return (ScheduledThreadPoolExecutor) createExecutorService(conf, prop, 
true);
   }
 
+  /**
+   * Create a named ScheduledThreadPool. The pool will not be instrumented 
without additional
+   * metrics. This method should be preferred, especially for short-lived 
pools.
+   *
+   * @param numThreads number of threads
+   * @param name thread pool name
+   * @return ScheduledThreadPoolExecutor
+   */
+  public ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
+      final String name) {
+    return createScheduledExecutorService(numThreads, name, false);
+  }
+
   /**
    * Create a named ScheduledThreadPool
    *
@@ -488,57 +576,66 @@ public class ThreadPools {
    * @param name thread pool name
    * @param emitThreadPoolMetrics When set to true will emit metrics and 
register the metrics in a
    *        static registry. After the thread pool is deleted, there will 
still be metrics objects
-   *        related to it in the static registry. There is no way to clean 
these left over objects
-   *        up therefore its recommended that this option only be set true for 
long lived thread
-   *        pools. Creating lots of short lived thread pools and registering 
them can lead to out of
-   *        memory errors over long time periods.
+   *        related to it in the static registry. There is no way to clean 
these leftover objects up
+   *        therefore its recommended that this option only be set true for 
long-lived thread pools.
+   *        Creating lots of short-lived thread pools and registering them can 
lead to out of memory
+   *        errors over long time periods.
    * @return ScheduledThreadPoolExecutor
    */
-  public ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
+  private ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
       final String name, boolean emitThreadPoolMetrics) {
     LOG.trace("Creating ScheduledThreadPoolExecutor for {} with {} threads", 
name, numThreads);
     var result =
         new ScheduledThreadPoolExecutor(numThreads, new 
NamedThreadFactory(name, handler)) {
 
           @Override
-          public void execute(Runnable command) {
+          public void execute(@NonNull Runnable command) {
             super.execute(TraceUtil.wrap(command));
           }
 
           @Override
-          public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
+          @NonNull
+          public <V> ScheduledFuture<V> schedule(@NonNull Callable<V> 
callable, long delay,
+              @NonNull TimeUnit unit) {
             return super.schedule(TraceUtil.wrap(callable), delay, unit);
           }
 
           @Override
-          public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+          @NonNull
+          public ScheduledFuture<?> schedule(@NonNull Runnable command, long 
delay,
+              @NonNull TimeUnit unit) {
             return super.schedule(TraceUtil.wrap(command), delay, unit);
           }
 
           @Override
-          public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay,
-              long period, TimeUnit unit) {
+          @NonNull
+          public ScheduledFuture<?> scheduleAtFixedRate(@NonNull Runnable 
command,
+              long initialDelay, long period, @NonNull TimeUnit unit) {
             return super.scheduleAtFixedRate(TraceUtil.wrap(command), 
initialDelay, period, unit);
           }
 
           @Override
-          public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
long initialDelay,
-              long delay, TimeUnit unit) {
+          @NonNull
+          public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable 
command,
+              long initialDelay, long delay, @NonNull TimeUnit unit) {
             return super.scheduleWithFixedDelay(TraceUtil.wrap(command), 
initialDelay, delay, unit);
           }
 
           @Override
-          public <T> Future<T> submit(Callable<T> task) {
+          @NonNull
+          public <T> Future<T> submit(@NonNull Callable<T> task) {
             return super.submit(TraceUtil.wrap(task));
           }
 
           @Override
-          public <T> Future<T> submit(Runnable task, T result) {
+          @NonNull
+          public <T> Future<T> submit(@NonNull Runnable task, T result) {
             return super.submit(TraceUtil.wrap(task), result);
           }
 
           @Override
-          public Future<?> submit(Runnable task) {
+          @NonNull
+          public Future<?> submit(@NonNull Runnable task) {
             return super.submit(TraceUtil.wrap(task));
           }
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 6f69dc4e94..ea0c4ceabe 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -19,7 +19,7 @@
 package org.apache.accumulo.core.file.rfile;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -232,8 +232,9 @@ public class MultiThreadedRFileTest {
       // now start up multiple RFile deepcopies
       int maxThreads = 10;
       String name = "MultiThreadedRFileTestThread";
-      ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createThreadPool(maxThreads + 1,
-          maxThreads + 1, 5 * 60, SECONDS, name, false);
+      ThreadPoolExecutor pool =
+          
ThreadPools.getServerThreadPools().getPoolBuilder(name).numCoreThreads(maxThreads
 + 1)
+              .numMaxThreads(maxThreads + 1).withTimeOut(5, MINUTES).build();
       try {
         Runnable runnable = () -> {
           try {
diff --git 
a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
 
b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
new file mode 100644
index 0000000000..5146ccf5b2
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.threads;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+public class ThreadPoolExecutorBuilderTest {
+
+  private final ThreadPools serverPool = ThreadPools.getServerThreadPools();
+
+  @Test
+  public void builderDefaultsTest() {
+    var p = serverPool.getPoolBuilder("defaults").build();
+    assertEquals(0, p.getCorePoolSize());
+    assertEquals(1, p.getMaximumPoolSize());
+    assertEquals(3L, p.getKeepAliveTime(MINUTES));
+  }
+
+  @Test
+  public void builderInvalidNumCoreTest() {
+    assertThrows(IllegalArgumentException.class,
+        () -> serverPool.getPoolBuilder("test1").numCoreThreads(-1).build());
+  }
+
+  @Test
+  public void builderInvalidNumMaxThreadsTest() {
+    // max threads must be > core threads
+    assertThrows(IllegalArgumentException.class,
+        () -> 
serverPool.getPoolBuilder("test1").numCoreThreads(2).numMaxThreads(1).build());
+  }
+
+  @Test
+  public void builderPoolCoreMaxTest() {
+    var p = 
serverPool.getPoolBuilder("test1").numCoreThreads(1).numMaxThreads(2).build();
+    assertEquals(1, p.getCorePoolSize());
+    assertEquals(2, p.getMaximumPoolSize());
+  }
+
+  @Test
+  public void builderFixedPoolTest() {
+    var p = serverPool.getPoolBuilder("test1").numCoreThreads(2).build();
+    assertEquals(2, p.getCorePoolSize());
+    assertEquals(2, p.getMaximumPoolSize());
+  }
+
+  @Test
+  public void buildeSetTimeoutTest() {
+    var p = serverPool.getPoolBuilder("test1").withTimeOut(0L, 
MILLISECONDS).build();
+    assertEquals(0, p.getCorePoolSize());
+    assertEquals(1, p.getMaximumPoolSize());
+    assertEquals(0L, p.getKeepAliveTime(MINUTES));
+
+    var p2 = serverPool.getPoolBuilder("test1").withTimeOut(123L, 
MILLISECONDS).build();
+    assertEquals(0, p2.getCorePoolSize());
+    assertEquals(1, p2.getMaximumPoolSize());
+    assertEquals(123L, p2.getKeepAliveTime(MILLISECONDS));
+  }
+}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java 
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index d1ac02443f..683ffaa058 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -135,7 +135,7 @@ public class BulkImporter {
 
       timer.start(Timers.EXAMINE_MAP_FILES);
       ExecutorService threadPool = ThreadPools.getServerThreadPools()
-          .createFixedThreadPool(numThreads, "findOverlapping", false);
+          
.getPoolBuilder("findOverlapping").numCoreThreads(numThreads).build();
 
       for (Path path : paths) {
         final Path mapFile = path;
@@ -362,8 +362,8 @@ public class BulkImporter {
 
     final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new 
TreeMap<>());
 
-    ExecutorService threadPool = ThreadPools.getServerThreadPools()
-        .createFixedThreadPool(numThreads, "estimateSizes", false);
+    ExecutorService threadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("estimateSizes")
+        .numCoreThreads(numThreads).build();
 
     for (final Entry<Path,List<TabletLocation>> entry : 
assignments.entrySet()) {
       if (entry.getValue().size() == 1) {
@@ -552,8 +552,8 @@ public class BulkImporter {
       }
     });
 
-    ExecutorService threadPool =
-        ThreadPools.getServerThreadPools().createFixedThreadPool(numThreads, 
"submit", false);
+    ExecutorService threadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("submit")
+        .numCoreThreads(numThreads).build();
 
     for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : 
assignmentsPerTabletServer
         .entrySet()) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index e5ac4945a3..d59abbbd3f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -191,8 +191,8 @@ public class ServerConfigurationFactory extends 
ServerConfiguration {
 
       Runnable refreshTask = this::verifySnapshotVersions;
 
-      ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools()
-          .createScheduledExecutorService(1, "config-refresh", false);
+      ScheduledThreadPoolExecutor executor =
+          ThreadPools.getServerThreadPools().createScheduledExecutorService(1, 
"config-refresh");
 
       // scheduleWithFixedDelay - used so only one task will run concurrently.
       // staggering the initial delay prevents synchronization of Accumulo 
servers communicating
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
index 48eb1fa181..5c42d0c0a3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.server.conf.store.impl;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -42,8 +44,9 @@ public class PropCacheCaffeineImpl implements PropCache {
 
   public static final int EXPIRE_MIN = 60;
   private static final Logger log = 
LoggerFactory.getLogger(PropCacheCaffeineImpl.class);
-  private static final Executor executor = 
ThreadPools.getServerThreadPools().createThreadPool(1,
-      20, 60, TimeUnit.SECONDS, "caffeine-tasks", false);
+  private static final Executor executor =
+      
ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1)
+          .numMaxThreads(20).withTimeOut(60L, SECONDS).build();
 
   private final PropStoreMetrics metrics;
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
index a58ea4db98..952409a2bb 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
@@ -57,9 +57,8 @@ public class PropStoreWatcher implements Watcher {
 
   private static final Logger log = 
LoggerFactory.getLogger(PropStoreWatcher.class);
 
-  private static final ExecutorService executorService =
-      ThreadPools.getServerThreadPools().createFixedThreadPool(2, 
"zoo_change_update", false);
-
+  private static final ExecutorService executorService = 
ThreadPools.getServerThreadPools()
+      .getPoolBuilder("zoo_change_update").numCoreThreads(2).build();
   private final ReentrantReadWriteLock listenerLock = new 
ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock listenerReadLock = 
listenerLock.readLock();
   private final ReentrantReadWriteLock.WriteLock listenerWriteLock = 
listenerLock.writeLock();
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 51ba4d4c9e..0fa7d7b7d3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -323,8 +323,8 @@ public class VolumeManagerImpl implements VolumeManager {
   public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String 
poolName,
       String transactionId) throws IOException {
     List<Future<Void>> results = new ArrayList<>();
-    ExecutorService workerPool =
-        ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, 
poolName, false);
+    ExecutorService workerPool = 
ThreadPools.getServerThreadPools().getPoolBuilder(poolName)
+        .numCoreThreads(poolSize).build();
     oldToNewPathMap.forEach((oldPath, newPath) -> 
results.add(workerPool.submit(() -> {
       boolean success;
       try {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index cdbac5fc9f..2c04fc9e49 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.server.problems;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Iterator;
@@ -65,8 +67,9 @@ public class ProblemReports implements 
Iterable<ProblemReport> {
    * processed because the whole system is in a really bad state (like HDFS is 
down) and everything
    * is reporting lots of problems, but problem reports can not be processed
    */
-  private ExecutorService reportExecutor = 
ThreadPools.getServerThreadPools().createThreadPool(0, 1,
-      60, TimeUnit.SECONDS, "acu-problem-reporter", new 
LinkedBlockingQueue<>(500), false);
+  private final ExecutorService reportExecutor = 
ThreadPools.getServerThreadPools()
+      
.getPoolBuilder("acu-problem-reporter").numCoreThreads(0).numMaxThreads(1)
+      .withTimeOut(60L, SECONDS).withQueue(new 
LinkedBlockingQueue<>(500)).build();
 
   private final ServerContext context;
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 3a72250268..8ae472cf8b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.server.rpc;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -32,7 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -307,8 +307,9 @@ public class TServerUtils {
   private static ThreadPoolExecutor createSelfResizingThreadPool(final String 
serverName,
       final int executorThreads, long threadTimeOut, final 
AccumuloConfiguration conf,
       long timeBetweenThreadChecks) {
-    final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createFixedThreadPool(
-        executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + 
"-ClientPool", true);
+    final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools()
+        .getPoolBuilder(serverName + 
"-ClientPool").numCoreThreads(executorThreads)
+        .withTimeOut(threadTimeOut, 
MILLISECONDS).enableThreadPoolMetrics().build();
     // periodically adjust the number of threads we need by checking how busy 
our threads are
     ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
       // there is a minor race condition between sampling the current state of 
the thread pool
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index bce5912463..18de10604c 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -121,8 +121,8 @@ public class RemoveEntriesForMissingFiles {
 
     Map<Path,Path> cache = new LRUMap<>(100000);
     Set<Path> processing = new HashSet<>();
-    ExecutorService threadPool =
-        ThreadPools.getServerThreadPools().createFixedThreadPool(16, 
"CheckFileTasks", false);
+    ExecutorService threadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("CheckFileTasks")
+        .numCoreThreads(16).build();
 
     System.out.printf("Scanning : %s %s\n", tableName, range);
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index 3861ddf0ca..f1201830a3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -117,8 +117,9 @@ public class VerifyTabletAssignments {
       }
     }
 
-    ExecutorService tp =
-        ThreadPools.getServerThreadPools().createFixedThreadPool(20, 
"CheckTabletServer", false);
+    ExecutorService tp = 
ThreadPools.getServerThreadPools().getPoolBuilder("CheckTabletServer")
+        .numCoreThreads(20).build();
+
     for (final Entry<HostAndPort,List<KeyExtent>> entry : 
extentsPerServer.entrySet()) {
       Runnable r = () -> {
         try {
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
index bf61c02ab9..e52b39a419 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
@@ -60,8 +60,8 @@ public class ReadyMonitorTest {
     // these tests wait for workers to signal ready using count down latch.
     // size pool so some threads are likely to wait on others to complete.
     int numPoolThreads = numWorkerThreads / 2;
-    workerPool = 
ThreadPools.getServerThreadPools().createFixedThreadPool(numPoolThreads,
-        "readyMonitor-test-pool", false);
+    workerPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("readyMonitor-test-pool")
+        .numCoreThreads(numPoolThreads).build();
   }
 
   @AfterEach
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b0ec498a9e..75690df777 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -348,8 +348,8 @@ public class CompactionCoordinator extends AbstractServer
   }
 
   private void updateSummaries() {
-    ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
-        "Compaction Summary Gatherer", false);
+    ExecutorService executor = ThreadPools.getServerThreadPools()
+        .getPoolBuilder("Compaction Summary 
Gatherer").numCoreThreads(10).build();
     try {
       Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 5c259b13f4..45b6161bab 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.coordinator;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
@@ -74,11 +75,13 @@ public class CompactionFinalizer {
     int max = this.context.getConfiguration()
         
.getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS);
 
-    this.ntfyExecutor = ThreadPools.getServerThreadPools().createThreadPool(3, 
max, 1,
-        TimeUnit.MINUTES, "Compaction Finalizer Notifier", true);
+    this.ntfyExecutor = ThreadPools.getServerThreadPools()
+        .getPoolBuilder("Compaction Finalizer 
Notifier").numCoreThreads(3).numMaxThreads(max)
+        .withTimeOut(1L, MINUTES).enableThreadPoolMetrics().build();
 
-    this.backgroundExecutor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(1,
-        "Compaction Finalizer Background Task", true);
+    this.backgroundExecutor =
+        ThreadPools.getServerThreadPools().getPoolBuilder("Compaction 
Finalizer Background Task")
+            .numCoreThreads(1).enableThreadPoolMetrics().build();
 
     backgroundExecutor.execute(() -> {
       processPending();
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index c03d4496ca..cd1753b67f 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -292,7 +292,7 @@ public class GCRun implements GarbageCollectionEnvironment {
     minimizeDeletes(confirmedDeletes, processedDeletes, fs, log);
 
     ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
-        .createExecutorService(config, Property.GC_DELETE_THREADS, false);
+        .createExecutorService(config, Property.GC_DELETE_THREADS);
 
     final List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 1b63f18381..b81e0c7456 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -965,7 +965,7 @@ public class Manager extends AbstractServer
     final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
     int threads = 
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
     ExecutorService tp = ThreadPools.getServerThreadPools()
-        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
+        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE);
     long start = System.currentTimeMillis();
     final SortedMap<TServerInstance,TabletServerStatus> result = new 
ConcurrentSkipListMap<>();
     final RateLimiter shutdownServerRateLimiter = 
RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
index 77891ad49c..a5e9213948 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
@@ -163,7 +163,7 @@ public class ReplicationMetrics implements MetricsProducer {
         new AtomicInteger(0));
 
     ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
-        .createScheduledExecutorService(1, "replicationMetricsPoller", false);
+        .createScheduledExecutorService(1, "replicationMetricsPoller");
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
     ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, 
minimumRefreshDelay,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 758884d5b4..800455fe0e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -152,8 +152,8 @@ public class FateMetrics implements MetricsProducer {
     update();
 
     // get fate status is read only operation - no reason to be nice on 
shutdown.
-    ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
-        .createScheduledExecutorService(1, "fateMetricsPoller", false);
+    ScheduledExecutorService scheduler =
+        ThreadPools.getServerThreadPools().createScheduledExecutorService(1, 
"fateMetricsPoller");
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 
     ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 3f8261a480..f405881cb6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -73,8 +73,8 @@ public class RecoveryManager {
         CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, 
TimeUnit.MILLISECONDS)
             .maximumWeight(10_000_000).weigher((path, exist) -> 
path.toString().length()).build();
 
-    executor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(4,
-        "Walog sort starter", false);
+    executor =
+        ThreadPools.getServerThreadPools().createScheduledExecutorService(4, 
"Walog sort starter");
     zooCache = new ZooCache(manager.getContext().getZooReader(), null);
     try {
       List<String> workIDs =
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
index b07f7c79ab..ee0694fd56 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
@@ -208,9 +208,9 @@ public class BulkImport extends ManagerRepo {
 
     AccumuloConfiguration serverConfig = manager.getConfiguration();
     @SuppressWarnings("deprecation")
-    ExecutorService workers = 
ThreadPools.getServerThreadPools().createExecutorService(serverConfig,
-        serverConfig.resolve(Property.MANAGER_RENAME_THREADS, 
Property.MANAGER_BULK_RENAME_THREADS),
-        false);
+    ExecutorService workers =
+        ThreadPools.getServerThreadPools().createExecutorService(serverConfig, 
serverConfig
+            .resolve(Property.MANAGER_RENAME_THREADS, 
Property.MANAGER_BULK_RENAME_THREADS));
     List<Future<Exception>> results = new ArrayList<>();
 
     for (FileStatus file : mapFiles) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index d62e824e05..11cb713b58 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.manager.upgrade;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
@@ -25,7 +27,6 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -193,9 +194,9 @@ public class UpgradeCoordinator {
         "Not currently in a suitable state to do metadata upgrade %s", status);
 
     if (currentVersion < AccumuloDataVersion.get()) {
-      return ThreadPools.getServerThreadPools().createThreadPool(0, 
Integer.MAX_VALUE, 60L,
-          TimeUnit.SECONDS, "UpgradeMetadataThreads", new 
SynchronousQueue<>(), false)
-          .submit(() -> {
+      return 
ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads")
+          .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, 
SECONDS)
+          .withQueue(new SynchronousQueue<>()).build().submit(() -> {
             try {
               for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) 
{
                 log.info("Upgrading Root from data version {}", v);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e41b99db97..8e9013fdc5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -985,7 +985,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
 
     // Start the pool to handle outgoing replications
     final ThreadPoolExecutor replicationThreadPool = 
ThreadPools.getServerThreadPools()
-        .createExecutorService(getConfiguration(), 
Property.REPLICATION_WORKER_THREADS, false);
+        .createExecutorService(getConfiguration(), 
Property.REPLICATION_WORKER_THREADS);
     replWorker.setExecutor(replicationThreadPool);
     replWorker.run();
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 3b14ab3728..3d80f9c1ed 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -19,6 +19,8 @@
 package org.apache.accumulo.tserver;
 
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toUnmodifiableMap;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 
@@ -134,7 +136,7 @@ public class TabletServerResourceManager {
   private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String 
name,
       final ThreadPoolExecutor tp) {
     
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
-        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, 
TimeUnit.SECONDS));
+        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, SECONDS));
   }
 
   private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
@@ -183,9 +185,10 @@ public class TabletServerResourceManager {
 
     scanExecQueues.put(sec.name, queue);
 
-    ThreadPoolExecutor es = 
ThreadPools.getServerThreadPools().createThreadPool(
-        sec.getCurrentMaxThreads(), sec.getCurrentMaxThreads(), 0L, 
TimeUnit.MILLISECONDS,
-        "scan-" + sec.name, queue, sec.priority, true);
+    ThreadPoolExecutor es = 
ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name)
+        
.numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads())
+        .withTimeOut(0L, 
MILLISECONDS).withQueue(queue).atPriority(sec.priority)
+        .enableThreadPoolMetrics().build();
     modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + 
sec.name, es);
     return es;
 
@@ -304,14 +307,17 @@ public class TabletServerResourceManager {
         () -> 
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
         "minor compactor", minorCompactionThreadPool);
 
-    splitThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, 
1, 1, TimeUnit.SECONDS,
-        "splitter", true);
+    splitThreadPool =
+        
ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0)
+            .numMaxThreads(1).withTimeOut(1, 
SECONDS).enableThreadPoolMetrics().build();
 
-    defaultSplitThreadPool = 
ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60,
-        TimeUnit.SECONDS, "md splitter", true);
+    defaultSplitThreadPool =
+        ThreadPools.getServerThreadPools().getPoolBuilder("md 
splitter").numCoreThreads(0)
+            .numMaxThreads(1).withTimeOut(60, 
SECONDS).enableThreadPoolMetrics().build();
 
-    defaultMigrationPool = 
ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60,
-        TimeUnit.SECONDS, "metadata tablet migration", true);
+    defaultMigrationPool = ThreadPools.getServerThreadPools()
+        .getPoolBuilder("metadata tablet 
migration").numCoreThreads(0).numMaxThreads(1)
+        .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build();
 
     migrationPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
         Property.TSERV_MIGRATE_MAXCONCURRENT, true);
@@ -330,8 +336,9 @@ public class TabletServerResourceManager {
         () -> 
context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
         "tablet assignment", assignmentPool);
 
-    assignMetaDataPool = 
ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60,
-        TimeUnit.SECONDS, "metadata tablet assignment", true);
+    assignMetaDataPool = ThreadPools.getServerThreadPools()
+        .getPoolBuilder("metadata tablet 
assignment").numCoreThreads(0).numMaxThreads(1)
+        .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build();
 
     activeAssignments = new ConcurrentHashMap<>();
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 78f5fc1173..3054db17ea 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.tserver.compactions;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 
 import java.util.Collection;
@@ -35,7 +36,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -132,8 +132,8 @@ public class CompactionService {
 
     this.executors = Map.copyOf(tmpExecutors);
 
-    this.planningExecutor = 
ThreadPools.getServerThreadPools().createThreadPool(1, 1, 0L,
-        TimeUnit.MILLISECONDS, "CompactionPlanner", false);
+    this.planningExecutor = 
ThreadPools.getServerThreadPools().getPoolBuilder("CompactionPlanner")
+        .numCoreThreads(1).numMaxThreads(1).withTimeOut(0L, 
MILLISECONDS).build();
 
     this.queuedForPlanning = new EnumMap<>(CompactionKind.class);
     for (CompactionKind kind : CompactionKind.values()) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index ffdebedca9..229f53ca77 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.tserver.compactions;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -27,7 +29,6 @@ import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -172,9 +173,9 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
 
     queue = new PriorityBlockingQueue<>(100, comparator);
 
-    threadPool = ThreadPools.getServerThreadPools().createThreadPool(threads, 
threads, 60,
-        TimeUnit.SECONDS, "compaction." + ceid, queue, false);
-
+    threadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("compaction." + ceid)
+        .numCoreThreads(threads).numMaxThreads(threads).withTimeOut(60L, 
SECONDS).withQueue(queue)
+        .build();
     metricCloser =
         ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> 
queuedJob.size());
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 8884f398a8..6fc396e4f4 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -296,8 +296,9 @@ public class LogSorter {
     @SuppressWarnings("deprecation")
     int threadPoolSize = this.conf.getCount(this.conf
         .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, 
Property.TSERV_RECOVERY_MAX_CONCURRENT));
-    ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
-        .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
+    ThreadPoolExecutor threadPool =
+        
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+            .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
     new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, 
sortedLogConf,
         context).startProcessing(new LogProcessor(), threadPool);
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 2e5bc3e4b1..346b70166c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -262,8 +262,8 @@ public class TabletServerLogger {
     if (nextLogMaker != null) {
       return;
     }
-    nextLogMaker =
-        ThreadPools.getServerThreadPools().createFixedThreadPool(1, "WALog 
creator", true);
+    nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog 
creator")
+        .numCoreThreads(1).enableThreadPoolMetrics().build();
     nextLogMaker.execute(new Runnable() {
       @Override
       public void run() {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index dfb3f87e25..e3c6bb5150 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -77,7 +77,7 @@ public class CompactionExecutorsMetrics implements 
MetricsProducer {
 
   protected void startUpdateThread() {
     ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
-        .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", 
false);
+        .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller");
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
     
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
diff --git 
a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java 
b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
index 2f45208e26..40c1487e58 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
@@ -78,8 +78,9 @@ public class BalanceWithOfflineTableIT extends 
ConfigurableMacBase {
 
       log.info("Waiting for balance");
 
-      ExecutorService pool =
-          ThreadPools.getServerThreadPools().createFixedThreadPool(1, 
"waitForBalance", false);
+      ExecutorService pool = 
ThreadPools.getServerThreadPools().getPoolBuilder("waitForBalance")
+          .numCoreThreads(1).build();
+
       Future<Boolean> wait = pool.submit(() -> {
         c.instanceOperations().waitForBalance();
         return true;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 80e422e6d6..101c9fc65e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -213,7 +213,7 @@ public class BatchWriterFlushIT extends 
AccumuloClusterHarness {
       }
 
       ThreadPoolExecutor threads = ThreadPools.getServerThreadPools()
-          .createFixedThreadPool(NUM_THREADS, "ClientThreads", false);
+          .getPoolBuilder("ClientThreads").numCoreThreads(NUM_THREADS).build();
       threads.allowCoreThreadTimeOut(false);
       threads.prestartAllCoreThreads();
 

Reply via email to