This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new ec8ae122ed Use fluent-style builder for pool creation, replacing overloaded methods (#4384) ec8ae122ed is described below commit ec8ae122edfd5c25601694d107f18af014e28383 Author: EdColeman <d...@etcoleman.com> AuthorDate: Tue Mar 19 13:31:38 2024 -0400 Use fluent-style builder for pool creation, replacing overloaded methods (#4384) * Use fluent-style builder for pool creation, replacing overloaded methods * Replaces overloaded createThreadPool methods with a fluent-style builder (ThreadPoolExecutorBuilderTest). * Adds ThreadPoolExecutorBuilderTest test, * Adds `createExecutorService` method that does not have an option to enable metrics and replaces all occurrences in the code that was calling the alternate method with emitThreadPoolMetrics=false to use it. The `createScheduledExecutorService` will be refactored in a future PR when service initialization is reworked. See PR #4342 for an example. This change isolates where metrics can be enabled and makes finding them easier. It also will make reviewing future changes easier because those changes will be isolated to places that currently enable thread pool metrics. --- .../accumulo/core/clientImpl/ClientContext.java | 9 +- .../core/clientImpl/ConditionalWriterImpl.java | 2 +- .../core/clientImpl/InstanceOperationsImpl.java | 4 +- .../core/clientImpl/TableOperationsImpl.java | 3 +- .../core/clientImpl/TabletServerBatchReader.java | 5 +- .../core/clientImpl/TabletServerBatchWriter.java | 8 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 8 +- .../accumulo/core/file/BloomFilterLayer.java | 5 +- .../file/blockfile/cache/lru/LruBlockCache.java | 4 +- .../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 2 +- .../util/compaction/ExternalCompactionUtil.java | 10 +- .../accumulo/core/util/threads/ThreadPools.java | 419 +++++++++++++-------- .../core/file/rfile/MultiThreadedRFileTest.java | 7 +- .../threads/ThreadPoolExecutorBuilderTest.java | 79 ++++ .../accumulo/server/client/BulkImporter.java | 10 +- .../server/conf/ServerConfigurationFactory.java | 4 +- .../conf/store/impl/PropCacheCaffeineImpl.java | 7 +- .../server/conf/store/impl/PropStoreWatcher.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 4 +- .../accumulo/server/problems/ProblemReports.java | 7 +- .../apache/accumulo/server/rpc/TServerUtils.java | 7 +- .../server/util/RemoveEntriesForMissingFiles.java | 4 +- .../server/util/VerifyTabletAssignments.java | 5 +- .../server/conf/store/impl/ReadyMonitorTest.java | 4 +- .../coordinator/CompactionCoordinator.java | 4 +- .../accumulo/coordinator/CompactionFinalizer.java | 11 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 2 +- .../java/org/apache/accumulo/manager/Manager.java | 2 +- .../manager/metrics/ReplicationMetrics.java | 2 +- .../accumulo/manager/metrics/fate/FateMetrics.java | 4 +- .../accumulo/manager/recovery/RecoveryManager.java | 4 +- .../manager/tableOps/bulkVer1/BulkImport.java | 6 +- .../manager/upgrade/UpgradeCoordinator.java | 9 +- .../org/apache/accumulo/tserver/TabletServer.java | 2 +- .../tserver/TabletServerResourceManager.java | 31 +- .../tserver/compactions/CompactionService.java | 6 +- .../compactions/InternalCompactionExecutor.java | 9 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 5 +- .../accumulo/tserver/log/TabletServerLogger.java | 4 +- .../metrics/CompactionExecutorsMetrics.java | 2 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 5 +- .../test/functional/BatchWriterFlushIT.java | 2 +- 42 files changed, 465 insertions(+), 267 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 8f7993812a..d02a2c743e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -260,8 +260,9 @@ public class ClientContext implements AccumuloClient { submitScannerReadAheadTask(Callable<List<KeyValue>> c) { ensureOpen(); if (scannerReadaheadPool == null) { - scannerReadaheadPool = clientThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, SECONDS, - "Accumulo scanner read ahead thread", new SynchronousQueue<>(), true); + scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo scanner read ahead thread") + .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, SECONDS) + .withQueue(new SynchronousQueue<>()).enableThreadPoolMetrics().build(); } return scannerReadaheadPool.submit(c); } @@ -269,8 +270,8 @@ public class ClientContext implements AccumuloClient { public synchronized void executeCleanupTask(Runnable r) { ensureOpen(); if (cleanupThreadPool == null) { - cleanupThreadPool = clientThreadPools.createFixedThreadPool(1, 3, SECONDS, - "Conditional Writer Cleanup Thread", true); + cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer Cleanup Thread") + .numCoreThreads(1).withTimeOut(3L, SECONDS).enableThreadPoolMetrics().build(); } this.cleanupThreadPool.execute(r); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 3d6f1f2cf2..e9802300a3 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -371,7 +371,7 @@ class ConditionalWriterImpl implements ConditionalWriter { this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName(), false); + config.getMaxWriteThreads(), this.getClass().getSimpleName()); this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 37229b88c0..084d59ef11 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -301,8 +301,8 @@ public class InstanceOperationsImpl implements InstanceOperations { List<String> tservers = getTabletServers(); int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256)); - var executorService = - context.threadPools().createFixedThreadPool(numThreads, "getactivecompactions", false); + var executorService = context.threadPools().getPoolBuilder("getactivecompactions") + .numCoreThreads(numThreads).build(); try { List<Future<List<ActiveCompaction>>> futures = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 16f1d92284..129cb6a681 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -495,7 +495,8 @@ public class TableOperationsImpl extends TableOperationsHelper { CountDownLatch latch = new CountDownLatch(splits.size()); AtomicReference<Exception> exception = new AtomicReference<>(null); - ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false); + ExecutorService executor = + context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build(); try { executor.execute( new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 5c2f6229e8..23f40e9be3 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -71,8 +71,9 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan this.tableName = tableName; this.numThreads = numQueryThreads; - queryThreadPool = context.threadPools().createFixedThreadPool(numQueryThreads, - "batch scanner " + batchReaderInstance + "-", false); + queryThreadPool = + context.threadPools().getPoolBuilder("batch scanner " + batchReaderInstance + "-") + .numCoreThreads(numQueryThreads).build(); // Call shutdown on this thread pool in case the caller does not call close(). cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index c4461a3d71..980ba0408a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -672,11 +672,11 @@ public class TabletServerBatchWriter implements AutoCloseable { public MutationWriter(int numSendThreads) { serversMutations = new HashMap<>(); queued = new HashSet<>(); - sendThreadPool = context.threadPools().createFixedThreadPool(numSendThreads, - this.getClass().getName(), false); + sendThreadPool = context.threadPools().getPoolBuilder(this.getClass().getName()) + .numCoreThreads(numSendThreads).build(); locators = new HashMap<>(); - binningThreadPool = context.threadPools().createFixedThreadPool(1, "BinMutations", - new SynchronousQueue<>(), false); + binningThreadPool = context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1) + .withQueue(new SynchronousQueue<>()).build(); binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 4810048e93..f13420d006 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -482,12 +482,12 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti if (this.executor != null) { executor = this.executor; } else if (numThreads > 0) { - executor = service = - context.threadPools().createFixedThreadPool(numThreads, "BulkImportThread", false); + executor = service = context.threadPools().getPoolBuilder("BulkImportThread") + .numCoreThreads(numThreads).build(); } else { String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); - executor = service = context.threadPools().createFixedThreadPool( - ConfigurationTypeHelper.getNumThreads(threads), "BulkImportThread", false); + executor = service = context.threadPools().getPoolBuilder("BulkImportThread") + .numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build(); } try { diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index 39cd7d2d13..318f87a88e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -79,10 +79,9 @@ public class BloomFilterLayer { } if (maxLoadThreads > 0) { - loadThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, maxLoadThreads, 60, - SECONDS, "bloom-loader", false); + loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader") + .numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, SECONDS).build(); } - return loadThreadPool; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java index 01972f8ffe..0183ebe3bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -102,8 +102,8 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block private final EvictionThread evictionThread; /** Statistics thread schedule pool (for heavy debugging, could remove) */ - private final ScheduledExecutorService scheduleThreadPool = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "LRUBlockCacheStats", false); + private final ScheduledExecutorService scheduleThreadPool = + ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "LRUBlockCacheStats"); /** Current size of cache */ private final AtomicLong size; diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java index 04cab1a38c..46a07682bd 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@ -62,7 +62,7 @@ public final class TinyLfuBlockCache implements BlockCache { private final Policy.Eviction<String,Block> policy; private final int maxSize; private final ScheduledExecutorService statsExecutor = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", false); + .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor"); public TinyLfuBlockCache(Configuration conf, CacheType type) { cache = Caffeine.newBuilder() diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 5d0107bfb0..a974836ab0 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -222,9 +222,8 @@ public class ExternalCompactionUtil { */ public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) { final List<RunningCompactionFuture> rcFutures = new ArrayList<>(); - final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16, - "CompactorRunningCompactions", false); - + final ExecutorService executor = ThreadPools.getServerThreadPools() + .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); getCompactorAddrs(context).forEach((q, hp) -> { hp.forEach(hostAndPort -> { rcFutures.add(new RunningCompactionFuture(q, hostAndPort, @@ -250,9 +249,8 @@ public class ExternalCompactionUtil { public static Collection<ExternalCompactionId> getCompactionIdsRunningOnCompactors(ClientContext context) { - final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16, - "CompactorRunningCompactions", false); - + final ExecutorService executor = ThreadPools.getServerThreadPools() + .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); List<Future<ExternalCompactionId>> futures = new ArrayList<>(); getCompactorAddrs(context).forEach((q, hp) -> { diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 7206118ce4..147f19e5dd 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.util.threads; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import java.lang.Thread.UncaughtExceptionHandler; @@ -43,9 +44,12 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.trace.TraceUtil; +import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN", @@ -64,7 +68,7 @@ public class ThreadPools { private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); // the number of seconds before we allow a thread to terminate with non-use. - public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L; + public static final long DEFAULT_TIMEOUT_MILLISECS = MINUTES.toMillis(3); private static final ThreadPools SERVER_INSTANCE = new ThreadPools(Threads.UEH); @@ -77,7 +81,7 @@ public class ThreadPools { } private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = - getServerThreadPools().createFixedThreadPool(1, "Scheduled Future Checker", false); + getServerThreadPools().getPoolBuilder("Scheduled Future Checker").numCoreThreads(1).build(); private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS = new ConcurrentLinkedQueue<>(); @@ -85,7 +89,7 @@ public class ThreadPools { private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS = new ConcurrentLinkedQueue<>(); - private static Runnable TASK_CHECKER = new Runnable() { + private static final Runnable TASK_CHECKER = new Runnable() { @Override public void run() { final List<ConcurrentLinkedQueue<ScheduledFuture<?>>> queues = @@ -100,7 +104,7 @@ public class ThreadPools { } }); try { - TimeUnit.MINUTES.sleep(1); + MINUTES.sleep(1); } catch (InterruptedException ie) { // This thread was interrupted by something while sleeping. We don't want to exit // this thread, so reset the interrupt state on this thread and keep going. @@ -232,6 +236,20 @@ public class ThreadPools { handler = ueh; } + /** + * Create a thread pool based on a thread pool related property. The pool will not be instrumented + * without additional metrics. This method should be preferred, especially for short-lived pools. + * + * @param conf accumulo configuration + * @param p thread pool related property + * @return ExecutorService impl + * @throws IllegalArgumentException if property is not handled + */ + public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf, + final Property p) { + return createExecutorService(conf, p, false); + } + /** * Create a thread pool based on a thread pool related property * @@ -239,173 +257,226 @@ public class ThreadPools { * @param p thread pool related property * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. + * related to it in the static registry. There is no way to clean these leftover objects up + * therefore its recommended that this option only be set true for long-lived thread pools. + * Creating lots of short-lived thread pools and registering them can lead to out of memory + * errors over long time periods. * @return ExecutorService impl - * @throws RuntimeException if property is not handled + * @throws IllegalArgumentException if property is not handled */ @SuppressWarnings("deprecation") public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf, final Property p, boolean emitThreadPoolMetrics) { - + ThreadPoolExecutorBuilder builder; switch (p) { case GENERAL_SIMPLETIMER_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), "SimpleTimer", - emitThreadPoolMetrics); + return createScheduledExecutorService(conf.getCount(p), "SimpleTimer"); case GENERAL_THREADPOOL_SIZE: return createScheduledExecutorService(conf.getCount(p), "GeneralExecutor", emitThreadPoolMetrics); case MANAGER_BULK_THREADPOOL_SIZE: - return createFixedThreadPool(conf.getCount(p), - conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS, - "bulk import", emitThreadPoolMetrics); + builder = getPoolBuilder("bulk import").numCoreThreads(conf.getCount(p)).withTimeOut( + conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case MANAGER_RENAME_THREADS: - return createFixedThreadPool(conf.getCount(p), "bulk move", emitThreadPoolMetrics); + builder = getPoolBuilder("bulk move").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case MANAGER_FATE_THREADPOOL_SIZE: - return createFixedThreadPool(conf.getCount(p), "Repo Runner", emitThreadPoolMetrics); + builder = getPoolBuilder("Repo Runner").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case MANAGER_STATUS_THREAD_POOL_SIZE: + builder = getPoolBuilder("GatherTableInformation"); int threads = conf.getCount(p); if (threads == 0) { - return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, "GatherTableInformation", - new SynchronousQueue<>(), emitThreadPoolMetrics); + builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + .withQueue(new SynchronousQueue<>()); } else { - return createFixedThreadPool(threads, "GatherTableInformation", emitThreadPoolMetrics); + builder.numCoreThreads(threads); + } + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); } + return builder.build(); case TSERV_WORKQ_THREADS: - return createFixedThreadPool(conf.getCount(p), "distributed work queue", - emitThreadPoolMetrics); + builder = getPoolBuilder("distributed work queue").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_MINC_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "minor compactor", - emitThreadPoolMetrics); + builder = getPoolBuilder("minor compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_MIGRATE_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet migration", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet migration").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_ASSIGNMENT_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet assignment", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet assignment").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_RETRIEVAL_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary file retriever", emitThreadPoolMetrics); + builder = getPoolBuilder("summary file retriever").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_REMOTE_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, "summary remote", - emitThreadPoolMetrics); + builder = getPoolBuilder("summary remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_PARTITION_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary partition", emitThreadPoolMetrics); + builder = getPoolBuilder("summary partition").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: - return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics); + return getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build(); case REPLICATION_WORKER_THREADS: - return createFixedThreadPool(conf.getCount(p), "replication task", emitThreadPoolMetrics); + builder = getPoolBuilder("replication task").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + default: - throw new RuntimeException("Unhandled thread pool property: " + p); + throw new IllegalArgumentException("Unhandled thread pool property: " + p); } } - /** - * Create a named thread pool - * - * @param numThreads number of threads - * @param name thread pool name - * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a - * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. - * @return ThreadPoolExecutor - */ - public ThreadPoolExecutor createFixedThreadPool(int numThreads, final String name, - boolean emitThreadPoolMetrics) { - return createFixedThreadPool(numThreads, DEFAULT_TIMEOUT_MILLISECS, MILLISECONDS, name, - emitThreadPoolMetrics); + public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) { + return new ThreadPoolExecutorBuilder(name); } - /** - * Create a named thread pool - * - * @param numThreads number of threads - * @param name thread pool name - * @param queue queue to use for tasks - * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a - * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. - * @return ThreadPoolExecutor - */ - public ThreadPoolExecutor createFixedThreadPool(int numThreads, final String name, - BlockingQueue<Runnable> queue, boolean emitThreadPoolMetrics) { - return createThreadPool(numThreads, numThreads, DEFAULT_TIMEOUT_MILLISECS, MILLISECONDS, name, - queue, emitThreadPoolMetrics); - } + public class ThreadPoolExecutorBuilder { + final String name; + int coreThreads = 0; + int maxThreads = -1; + long timeOut = DEFAULT_TIMEOUT_MILLISECS; + TimeUnit units = MILLISECONDS; + BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + OptionalInt priority = OptionalInt.empty(); + boolean emitThreadPoolMetrics = false; + + /** + * A fluent-style build to create a ThreadPoolExecutor. The name is used when creating + * named-threads for the pool. + */ + ThreadPoolExecutorBuilder(@NonNull final String name) { + this.name = name; + } - /** - * Create a named thread pool - * - * @param numThreads number of threads - * @param timeOut core thread time out - * @param units core thread time out units - * @param name thread pool name - * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a - * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. - * @return ThreadPoolExecutor - */ - public ThreadPoolExecutor createFixedThreadPool(int numThreads, long timeOut, TimeUnit units, - final String name, boolean emitThreadPoolMetrics) { - return createThreadPool(numThreads, numThreads, timeOut, units, name, emitThreadPoolMetrics); - } + public ThreadPoolExecutor build() { + Preconditions.checkArgument(coreThreads >= 0, + "The number of core threads must be 0 or larger"); + if (maxThreads < 0) { + // create a fixed pool with maxThread = coreThreads if core threads set. + maxThreads = coreThreads == 0 ? 1 : coreThreads; + } + Preconditions.checkArgument(maxThreads >= coreThreads, + "The number of max threads must be greater than 0 and greater than or equal to the number of core threads"); + Preconditions.checkArgument( + priority.orElse(1) >= Thread.MIN_PRIORITY && priority.orElse(1) <= Thread.MAX_PRIORITY, + "invalid thread priority, range must be Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY"); + + return createThreadPool(coreThreads, maxThreads, timeOut, units, name, queue, priority, + emitThreadPoolMetrics); + } - /** - * Create a named thread pool - * - * @param coreThreads number of threads - * @param maxThreads max number of threads - * @param timeOut core thread time out - * @param units core thread time out units - * @param name thread pool name - * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a - * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. - * @return ThreadPoolExecutor - */ - public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, - TimeUnit units, final String name, boolean emitThreadPoolMetrics) { - return createThreadPool(coreThreads, maxThreads, timeOut, units, name, - new LinkedBlockingQueue<>(), emitThreadPoolMetrics); - } + /** + * Set the number of coreThreads. See {@link java.util.concurrent.ThreadPoolExecutor} + * + * @param coreThreads the number of core thread, must be 0 or larger. + * @return fluent-style builder instance + */ + public ThreadPoolExecutorBuilder numCoreThreads(int coreThreads) { + this.coreThreads = coreThreads; + return this; + } - /** - * Create a named thread pool - * - * @param coreThreads number of threads - * @param maxThreads max number of threads - * @param timeOut core thread time out - * @param units core thread time out units - * @param name thread pool name - * @param queue queue to use for tasks - * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a - * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. - * @return ThreadPoolExecutor - */ - public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, - TimeUnit units, final String name, BlockingQueue<Runnable> queue, - boolean emitThreadPoolMetrics) { - return createThreadPool(coreThreads, maxThreads, timeOut, units, name, queue, - OptionalInt.empty(), emitThreadPoolMetrics); + /** + * Set the maximum number of threads in the pool. See + * {@link java.util.concurrent.ThreadPoolExecutor}. If the maxThreads is not set, defaults to + * the number of core threads (if set) resulting in a fixed pool. If the number of core threads + * is not set, defaults to a single thread. + * + * @param maxThreads max number of threads. Must be greater than 0 and equal or greater that the + * number of core threads. + * + * @return fluent-style builder instance + */ + public ThreadPoolExecutorBuilder numMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + return this; + } + + /** + * Set the thread keep-alive time. See {@link java.util.concurrent.ThreadPoolExecutor} + * + * @param timeOut the thread keep alive time. + * @param units the keep alive time units. + * @return fluent-style builder instance + */ + public ThreadPoolExecutorBuilder withTimeOut(long timeOut, @NonNull TimeUnit units) { + this.timeOut = timeOut; + this.units = units; + return this; + } + + /** + * Set the queue that will hold runnable tasks before execution. See + * {@link java.util.concurrent.ThreadPoolExecutor} + * + * @param queue the work queue used to hold tasks before they are executed. + * @return fluent-style builder instance + */ + public ThreadPoolExecutorBuilder withQueue(@NonNull final BlockingQueue<Runnable> queue) { + this.queue = queue; + return this; + } + + public ThreadPoolExecutorBuilder atPriority(@NonNull final OptionalInt priority) { + this.priority = priority; + return this; + } + + /** + * When set to true will emit metrics and register the metrics in a static registry. After the + * thread pool is deleted, there will still be metrics objects related to it in the static + * registry. There is no way to clean these leftover objects up therefore its recommended that + * this option only be set true for long-lived thread pools. Creating lots of short-lived thread + * pools and registering them can lead to out of memory errors over long time periods. + * + * @return a fluent-style builder instance + */ + public ThreadPoolExecutorBuilder enableThreadPoolMetrics() { + this.emitThreadPoolMetrics = true; + return this; + } } /** @@ -420,15 +491,16 @@ public class ThreadPools { * @param priority thread priority * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. + * related to it in the static registry. There is no way to clean these leftover objects up + * therefore its recommended that this option only be set true for long-lived thread pools. + * Creating lots of short-lived thread pools and registering them can lead to out of memory + * errors over long time periods. * @return ThreadPoolExecutor */ - public ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, - TimeUnit units, final String name, BlockingQueue<Runnable> queue, OptionalInt priority, - boolean emitThreadPoolMetrics) { + private ThreadPoolExecutor createThreadPool(final int coreThreads, final int maxThreads, + final long timeOut, final TimeUnit units, final String name, + final BlockingQueue<Runnable> queue, final OptionalInt priority, + final boolean emitThreadPoolMetrics) { LOG.trace( "Creating ThreadPoolExecutor for {} with {} core threads and {} max threads {} {} timeout", name, coreThreads, maxThreads, timeOut, units); @@ -436,7 +508,7 @@ public class ThreadPools { new NamedThreadFactory(name, priority, handler)) { @Override - public void execute(Runnable arg0) { + public void execute(@NonNull Runnable arg0) { super.execute(TraceUtil.wrap(arg0)); } @@ -446,17 +518,20 @@ public class ThreadPools { } @Override - public <T> Future<T> submit(Callable<T> task) { + @NonNull + public <T> Future<T> submit(@NonNull Callable<T> task) { return super.submit(TraceUtil.wrap(task)); } @Override - public <T> Future<T> submit(Runnable task, T result) { + @NonNull + public <T> Future<T> submit(@NonNull Runnable task, T result) { return super.submit(TraceUtil.wrap(task), result); } @Override - public Future<?> submit(Runnable task) { + @NonNull + public Future<?> submit(@NonNull Runnable task) { return super.submit(TraceUtil.wrap(task)); } }; @@ -481,6 +556,19 @@ public class ThreadPools { return (ScheduledThreadPoolExecutor) createExecutorService(conf, prop, true); } + /** + * Create a named ScheduledThreadPool. The pool will not be instrumented without additional + * metrics. This method should be preferred, especially for short-lived pools. + * + * @param numThreads number of threads + * @param name thread pool name + * @return ScheduledThreadPoolExecutor + */ + public ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, + final String name) { + return createScheduledExecutorService(numThreads, name, false); + } + /** * Create a named ScheduledThreadPool * @@ -488,57 +576,66 @@ public class ThreadPools { * @param name thread pool name * @param emitThreadPoolMetrics When set to true will emit metrics and register the metrics in a * static registry. After the thread pool is deleted, there will still be metrics objects - * related to it in the static registry. There is no way to clean these left over objects - * up therefore its recommended that this option only be set true for long lived thread - * pools. Creating lots of short lived thread pools and registering them can lead to out of - * memory errors over long time periods. + * related to it in the static registry. There is no way to clean these leftover objects up + * therefore its recommended that this option only be set true for long-lived thread pools. + * Creating lots of short-lived thread pools and registering them can lead to out of memory + * errors over long time periods. * @return ScheduledThreadPoolExecutor */ - public ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, + private ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, final String name, boolean emitThreadPoolMetrics) { LOG.trace("Creating ScheduledThreadPoolExecutor for {} with {} threads", name, numThreads); var result = new ScheduledThreadPoolExecutor(numThreads, new NamedThreadFactory(name, handler)) { @Override - public void execute(Runnable command) { + public void execute(@NonNull Runnable command) { super.execute(TraceUtil.wrap(command)); } @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + @NonNull + public <V> ScheduledFuture<V> schedule(@NonNull Callable<V> callable, long delay, + @NonNull TimeUnit unit) { return super.schedule(TraceUtil.wrap(callable), delay, unit); } @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + @NonNull + public ScheduledFuture<?> schedule(@NonNull Runnable command, long delay, + @NonNull TimeUnit unit) { return super.schedule(TraceUtil.wrap(command), delay, unit); } @Override - public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, - long period, TimeUnit unit) { + @NonNull + public ScheduledFuture<?> scheduleAtFixedRate(@NonNull Runnable command, + long initialDelay, long period, @NonNull TimeUnit unit) { return super.scheduleAtFixedRate(TraceUtil.wrap(command), initialDelay, period, unit); } @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, - long delay, TimeUnit unit) { + @NonNull + public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable command, + long initialDelay, long delay, @NonNull TimeUnit unit) { return super.scheduleWithFixedDelay(TraceUtil.wrap(command), initialDelay, delay, unit); } @Override - public <T> Future<T> submit(Callable<T> task) { + @NonNull + public <T> Future<T> submit(@NonNull Callable<T> task) { return super.submit(TraceUtil.wrap(task)); } @Override - public <T> Future<T> submit(Runnable task, T result) { + @NonNull + public <T> Future<T> submit(@NonNull Runnable task, T result) { return super.submit(TraceUtil.wrap(task), result); } @Override - public Future<?> submit(Runnable task) { + @NonNull + public Future<?> submit(@NonNull Runnable task) { return super.submit(TraceUtil.wrap(task)); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 6f69dc4e94..ea0c4ceabe 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.file.rfile; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -232,8 +232,9 @@ public class MultiThreadedRFileTest { // now start up multiple RFile deepcopies int maxThreads = 10; String name = "MultiThreadedRFileTestThread"; - ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createThreadPool(maxThreads + 1, - maxThreads + 1, 5 * 60, SECONDS, name, false); + ThreadPoolExecutor pool = + ThreadPools.getServerThreadPools().getPoolBuilder(name).numCoreThreads(maxThreads + 1) + .numMaxThreads(maxThreads + 1).withTimeOut(5, MINUTES).build(); try { Runnable runnable = () -> { try { diff --git a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java new file mode 100644 index 0000000000..5146ccf5b2 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.threads; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +public class ThreadPoolExecutorBuilderTest { + + private final ThreadPools serverPool = ThreadPools.getServerThreadPools(); + + @Test + public void builderDefaultsTest() { + var p = serverPool.getPoolBuilder("defaults").build(); + assertEquals(0, p.getCorePoolSize()); + assertEquals(1, p.getMaximumPoolSize()); + assertEquals(3L, p.getKeepAliveTime(MINUTES)); + } + + @Test + public void builderInvalidNumCoreTest() { + assertThrows(IllegalArgumentException.class, + () -> serverPool.getPoolBuilder("test1").numCoreThreads(-1).build()); + } + + @Test + public void builderInvalidNumMaxThreadsTest() { + // max threads must be > core threads + assertThrows(IllegalArgumentException.class, + () -> serverPool.getPoolBuilder("test1").numCoreThreads(2).numMaxThreads(1).build()); + } + + @Test + public void builderPoolCoreMaxTest() { + var p = serverPool.getPoolBuilder("test1").numCoreThreads(1).numMaxThreads(2).build(); + assertEquals(1, p.getCorePoolSize()); + assertEquals(2, p.getMaximumPoolSize()); + } + + @Test + public void builderFixedPoolTest() { + var p = serverPool.getPoolBuilder("test1").numCoreThreads(2).build(); + assertEquals(2, p.getCorePoolSize()); + assertEquals(2, p.getMaximumPoolSize()); + } + + @Test + public void buildeSetTimeoutTest() { + var p = serverPool.getPoolBuilder("test1").withTimeOut(0L, MILLISECONDS).build(); + assertEquals(0, p.getCorePoolSize()); + assertEquals(1, p.getMaximumPoolSize()); + assertEquals(0L, p.getKeepAliveTime(MINUTES)); + + var p2 = serverPool.getPoolBuilder("test1").withTimeOut(123L, MILLISECONDS).build(); + assertEquals(0, p2.getCorePoolSize()); + assertEquals(1, p2.getMaximumPoolSize()); + assertEquals(123L, p2.getKeepAliveTime(MILLISECONDS)); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index d1ac02443f..683ffaa058 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -135,7 +135,7 @@ public class BulkImporter { timer.start(Timers.EXAMINE_MAP_FILES); ExecutorService threadPool = ThreadPools.getServerThreadPools() - .createFixedThreadPool(numThreads, "findOverlapping", false); + .getPoolBuilder("findOverlapping").numCoreThreads(numThreads).build(); for (Path path : paths) { final Path mapFile = path; @@ -362,8 +362,8 @@ public class BulkImporter { final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<>()); - ExecutorService threadPool = ThreadPools.getServerThreadPools() - .createFixedThreadPool(numThreads, "estimateSizes", false); + ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("estimateSizes") + .numCoreThreads(numThreads).build(); for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) { if (entry.getValue().size() == 1) { @@ -552,8 +552,8 @@ public class BulkImporter { } }); - ExecutorService threadPool = - ThreadPools.getServerThreadPools().createFixedThreadPool(numThreads, "submit", false); + ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("submit") + .numCoreThreads(numThreads).build(); for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer .entrySet()) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java index e5ac4945a3..d59abbbd3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java @@ -191,8 +191,8 @@ public class ServerConfigurationFactory extends ServerConfiguration { Runnable refreshTask = this::verifySnapshotVersions; - ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "config-refresh", false); + ScheduledThreadPoolExecutor executor = + ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "config-refresh"); // scheduleWithFixedDelay - used so only one task will run concurrently. // staggering the initial delay prevents synchronization of Accumulo servers communicating diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java index 48eb1fa181..5c42d0c0a3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.conf.store.impl; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -42,8 +44,9 @@ public class PropCacheCaffeineImpl implements PropCache { public static final int EXPIRE_MIN = 60; private static final Logger log = LoggerFactory.getLogger(PropCacheCaffeineImpl.class); - private static final Executor executor = ThreadPools.getServerThreadPools().createThreadPool(1, - 20, 60, TimeUnit.SECONDS, "caffeine-tasks", false); + private static final Executor executor = + ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1) + .numMaxThreads(20).withTimeOut(60L, SECONDS).build(); private final PropStoreMetrics metrics; diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java index a58ea4db98..952409a2bb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java @@ -57,9 +57,8 @@ public class PropStoreWatcher implements Watcher { private static final Logger log = LoggerFactory.getLogger(PropStoreWatcher.class); - private static final ExecutorService executorService = - ThreadPools.getServerThreadPools().createFixedThreadPool(2, "zoo_change_update", false); - + private static final ExecutorService executorService = ThreadPools.getServerThreadPools() + .getPoolBuilder("zoo_change_update").numCoreThreads(2).build(); private final ReentrantReadWriteLock listenerLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock listenerReadLock = listenerLock.readLock(); private final ReentrantReadWriteLock.WriteLock listenerWriteLock = listenerLock.writeLock(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 51ba4d4c9e..0fa7d7b7d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -323,8 +323,8 @@ public class VolumeManagerImpl implements VolumeManager { public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName, String transactionId) throws IOException { List<Future<Void>> results = new ArrayList<>(); - ExecutorService workerPool = - ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, poolName, false); + ExecutorService workerPool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName) + .numCoreThreads(poolSize).build(); oldToNewPathMap.forEach((oldPath, newPath) -> results.add(workerPool.submit(() -> { boolean success; try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java index cdbac5fc9f..2c04fc9e49 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.problems; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.Collections; import java.util.EnumMap; import java.util.Iterator; @@ -65,8 +67,9 @@ public class ProblemReports implements Iterable<ProblemReport> { * processed because the whole system is in a really bad state (like HDFS is down) and everything * is reporting lots of problems, but problem reports can not be processed */ - private ExecutorService reportExecutor = ThreadPools.getServerThreadPools().createThreadPool(0, 1, - 60, TimeUnit.SECONDS, "acu-problem-reporter", new LinkedBlockingQueue<>(500), false); + private final ExecutorService reportExecutor = ThreadPools.getServerThreadPools() + .getPoolBuilder("acu-problem-reporter").numCoreThreads(0).numMaxThreads(1) + .withTimeOut(60L, SECONDS).withQueue(new LinkedBlockingQueue<>(500)).build(); private final ServerContext context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 3a72250268..8ae472cf8b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.rpc; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.io.IOException; import java.net.InetAddress; @@ -32,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -307,8 +307,9 @@ public class TServerUtils { private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool( - executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true); + final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools() + .getPoolBuilder(serverName + "-ClientPool").numCoreThreads(executorThreads) + .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); // periodically adjust the number of threads we need by checking how busy our threads are ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> { // there is a minor race condition between sampling the current state of the thread pool diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java index bce5912463..18de10604c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java @@ -121,8 +121,8 @@ public class RemoveEntriesForMissingFiles { Map<Path,Path> cache = new LRUMap<>(100000); Set<Path> processing = new HashSet<>(); - ExecutorService threadPool = - ThreadPools.getServerThreadPools().createFixedThreadPool(16, "CheckFileTasks", false); + ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("CheckFileTasks") + .numCoreThreads(16).build(); System.out.printf("Scanning : %s %s\n", tableName, range); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java index 3861ddf0ca..f1201830a3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java @@ -117,8 +117,9 @@ public class VerifyTabletAssignments { } } - ExecutorService tp = - ThreadPools.getServerThreadPools().createFixedThreadPool(20, "CheckTabletServer", false); + ExecutorService tp = ThreadPools.getServerThreadPools().getPoolBuilder("CheckTabletServer") + .numCoreThreads(20).build(); + for (final Entry<HostAndPort,List<KeyExtent>> entry : extentsPerServer.entrySet()) { Runnable r = () -> { try { diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java index bf61c02ab9..e52b39a419 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java @@ -60,8 +60,8 @@ public class ReadyMonitorTest { // these tests wait for workers to signal ready using count down latch. // size pool so some threads are likely to wait on others to complete. int numPoolThreads = numWorkerThreads / 2; - workerPool = ThreadPools.getServerThreadPools().createFixedThreadPool(numPoolThreads, - "readyMonitor-test-pool", false); + workerPool = ThreadPools.getServerThreadPools().getPoolBuilder("readyMonitor-test-pool") + .numCoreThreads(numPoolThreads).build(); } @AfterEach diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b0ec498a9e..75690df777 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -348,8 +348,8 @@ public class CompactionCoordinator extends AbstractServer } private void updateSummaries() { - ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, - "Compaction Summary Gatherer", false); + ExecutorService executor = ThreadPools.getServerThreadPools() + .getPoolBuilder("Compaction Summary Gatherer").numCoreThreads(10).build(); try { Set<String> queuesSeen = new ConcurrentSkipListSet<>(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 5c259b13f4..45b6161bab 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.coordinator; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -74,11 +75,13 @@ public class CompactionFinalizer { int max = this.context.getConfiguration() .getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); - this.ntfyExecutor = ThreadPools.getServerThreadPools().createThreadPool(3, max, 1, - TimeUnit.MINUTES, "Compaction Finalizer Notifier", true); + this.ntfyExecutor = ThreadPools.getServerThreadPools() + .getPoolBuilder("Compaction Finalizer Notifier").numCoreThreads(3).numMaxThreads(max) + .withTimeOut(1L, MINUTES).enableThreadPoolMetrics().build(); - this.backgroundExecutor = ThreadPools.getServerThreadPools().createFixedThreadPool(1, - "Compaction Finalizer Background Task", true); + this.backgroundExecutor = + ThreadPools.getServerThreadPools().getPoolBuilder("Compaction Finalizer Background Task") + .numCoreThreads(1).enableThreadPoolMetrics().build(); backgroundExecutor.execute(() -> { processPending(); diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index c03d4496ca..cd1753b67f 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -292,7 +292,7 @@ public class GCRun implements GarbageCollectionEnvironment { minimizeDeletes(confirmedDeletes, processedDeletes, fs, log); ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(config, Property.GC_DELETE_THREADS, false); + .createExecutorService(config, Property.GC_DELETE_THREADS); final List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 1b63f18381..b81e0c7456 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -965,7 +965,7 @@ public class Manager extends AbstractServer final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); int threads = getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE); ExecutorService tp = ThreadPools.getServerThreadPools() - .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); + .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE); long start = System.currentTimeMillis(); final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java index 77891ad49c..a5e9213948 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java @@ -163,7 +163,7 @@ public class ReplicationMetrics implements MetricsProducer { new AtomicInteger(0)); ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "replicationMetricsPoller", false); + .createScheduledExecutorService(1, "replicationMetricsPoller"); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5); ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 758884d5b4..800455fe0e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -152,8 +152,8 @@ public class FateMetrics implements MetricsProducer { update(); // get fate status is read only operation - no reason to be nice on shutdown. - ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "fateMetricsPoller", false); + ScheduledExecutorService scheduler = + ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "fateMetricsPoller"); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 3f8261a480..f405881cb6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@ -73,8 +73,8 @@ public class RecoveryManager { CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) .maximumWeight(10_000_000).weigher((path, exist) -> path.toString().length()).build(); - executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, - "Walog sort starter", false); + executor = + ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); zooCache = new ZooCache(manager.getContext().getZooReader(), null); try { List<String> workIDs = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java index b07f7c79ab..ee0694fd56 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java @@ -208,9 +208,9 @@ public class BulkImport extends ManagerRepo { AccumuloConfiguration serverConfig = manager.getConfiguration(); @SuppressWarnings("deprecation") - ExecutorService workers = ThreadPools.getServerThreadPools().createExecutorService(serverConfig, - serverConfig.resolve(Property.MANAGER_RENAME_THREADS, Property.MANAGER_BULK_RENAME_THREADS), - false); + ExecutorService workers = + ThreadPools.getServerThreadPools().createExecutorService(serverConfig, serverConfig + .resolve(Property.MANAGER_RENAME_THREADS, Property.MANAGER_BULK_RENAME_THREADS)); List<Future<Exception>> results = new ArrayList<>(); for (FileStatus file : mapFiles) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index d62e824e05..11cb713b58 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.manager.upgrade; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -25,7 +27,6 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -193,9 +194,9 @@ public class UpgradeCoordinator { "Not currently in a suitable state to do metadata upgrade %s", status); if (currentVersion < AccumuloDataVersion.get()) { - return ThreadPools.getServerThreadPools().createThreadPool(0, Integer.MAX_VALUE, 60L, - TimeUnit.SECONDS, "UpgradeMetadataThreads", new SynchronousQueue<>(), false) - .submit(() -> { + return ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads") + .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + .withQueue(new SynchronousQueue<>()).build().submit(() -> { try { for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) { log.info("Upgrading Root from data version {}", v); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e41b99db97..8e9013fdc5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -985,7 +985,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer // Start the pool to handle outgoing replications final ThreadPoolExecutor replicationThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS, false); + .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS); replWorker.setExecutor(replicationThreadPool); replWorker.run(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 3b14ab3728..3d80f9c1ed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -19,6 +19,8 @@ package org.apache.accumulo.tserver; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; @@ -134,7 +136,7 @@ public class TabletServerResourceManager { private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) { ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( - () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS)); + () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, SECONDS)); } private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, @@ -183,9 +185,10 @@ public class TabletServerResourceManager { scanExecQueues.put(sec.name, queue); - ThreadPoolExecutor es = ThreadPools.getServerThreadPools().createThreadPool( - sec.getCurrentMaxThreads(), sec.getCurrentMaxThreads(), 0L, TimeUnit.MILLISECONDS, - "scan-" + sec.name, queue, sec.priority, true); + ThreadPoolExecutor es = ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name) + .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) + .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) + .enableThreadPoolMetrics().build(); modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, es); return es; @@ -304,14 +307,17 @@ public class TabletServerResourceManager { () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), "minor compactor", minorCompactionThreadPool); - splitThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, 1, 1, TimeUnit.SECONDS, - "splitter", true); + splitThreadPool = + ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0) + .numMaxThreads(1).withTimeOut(1, SECONDS).enableThreadPoolMetrics().build(); - defaultSplitThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60, - TimeUnit.SECONDS, "md splitter", true); + defaultSplitThreadPool = + ThreadPools.getServerThreadPools().getPoolBuilder("md splitter").numCoreThreads(0) + .numMaxThreads(1).withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); - defaultMigrationPool = ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60, - TimeUnit.SECONDS, "metadata tablet migration", true); + defaultMigrationPool = ThreadPools.getServerThreadPools() + .getPoolBuilder("metadata tablet migration").numCoreThreads(0).numMaxThreads(1) + .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, true); @@ -330,8 +336,9 @@ public class TabletServerResourceManager { () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT), "tablet assignment", assignmentPool); - assignMetaDataPool = ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60, - TimeUnit.SECONDS, "metadata tablet assignment", true); + assignMetaDataPool = ThreadPools.getServerThreadPools() + .getPoolBuilder("metadata tablet assignment").numCoreThreads(0).numMaxThreads(1) + .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); activeAssignments = new ConcurrentHashMap<>(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 78f5fc1173..3054db17ea 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.tserver.compactions; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import java.util.Collection; @@ -35,7 +36,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; @@ -132,8 +132,8 @@ public class CompactionService { this.executors = Map.copyOf(tmpExecutors); - this.planningExecutor = ThreadPools.getServerThreadPools().createThreadPool(1, 1, 0L, - TimeUnit.MILLISECONDS, "CompactionPlanner", false); + this.planningExecutor = ThreadPools.getServerThreadPools().getPoolBuilder("CompactionPlanner") + .numCoreThreads(1).numMaxThreads(1).withTimeOut(0L, MILLISECONDS).build(); this.queuedForPlanning = new EnumMap<>(CompactionKind.class); for (CompactionKind kind : CompactionKind.values()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index ffdebedca9..229f53ca77 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.tserver.compactions; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -27,7 +29,6 @@ import java.util.Set; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -172,9 +173,9 @@ public class InternalCompactionExecutor implements CompactionExecutor { queue = new PriorityBlockingQueue<>(100, comparator); - threadPool = ThreadPools.getServerThreadPools().createThreadPool(threads, threads, 60, - TimeUnit.SECONDS, "compaction." + ceid, queue, false); - + threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("compaction." + ceid) + .numCoreThreads(threads).numMaxThreads(threads).withTimeOut(60L, SECONDS).withQueue(queue) + .build(); metricCloser = ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 8884f398a8..6fc396e4f4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -296,8 +296,9 @@ public class LogSorter { @SuppressWarnings("deprecation") int threadPoolSize = this.conf.getCount(this.conf .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); - ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools() - .createFixedThreadPool(threadPoolSize, this.getClass().getName(), true); + ThreadPoolExecutor threadPool = + ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context).startProcessing(new LogProcessor(), threadPool); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 2e5bc3e4b1..346b70166c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -262,8 +262,8 @@ public class TabletServerLogger { if (nextLogMaker != null) { return; } - nextLogMaker = - ThreadPools.getServerThreadPools().createFixedThreadPool(1, "WALog creator", true); + nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog creator") + .numCoreThreads(1).enableThreadPoolMetrics().build(); nextLogMaker.execute(new Runnable() { @Override public void run() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java index dfb3f87e25..e3c6bb5150 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java @@ -77,7 +77,7 @@ public class CompactionExecutorsMetrics implements MetricsProducer { protected void startUpdateThread() { ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false); + .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller"); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5); ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update, diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java index 2f45208e26..40c1487e58 100644 --- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java @@ -78,8 +78,9 @@ public class BalanceWithOfflineTableIT extends ConfigurableMacBase { log.info("Waiting for balance"); - ExecutorService pool = - ThreadPools.getServerThreadPools().createFixedThreadPool(1, "waitForBalance", false); + ExecutorService pool = ThreadPools.getServerThreadPools().getPoolBuilder("waitForBalance") + .numCoreThreads(1).build(); + Future<Boolean> wait = pool.submit(() -> { c.instanceOperations().waitForBalance(); return true; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index 80e422e6d6..101c9fc65e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@ -213,7 +213,7 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness { } ThreadPoolExecutor threads = ThreadPools.getServerThreadPools() - .createFixedThreadPool(NUM_THREADS, "ClientThreads", false); + .getPoolBuilder("ClientThreads").numCoreThreads(NUM_THREADS).build(); threads.allowCoreThreadTimeOut(false); threads.prestartAllCoreThreads();