This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0fdfee5322278d8833aec541d47bf778489394ac Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Mon Jan 31 04:17:09 2022 -0500 Minor cleanup of ThreadPools * Avoid casting to ThreadPoolExecutor by using that type directly instead of its superclass * Avoid passing unnecessary queue and priority parameters by leveraging more of the overloaded createThreadPool methods --- .../accumulo/core/clientImpl/ScannerIterator.java | 7 +- .../accumulo/core/util/threads/ThreadPools.java | 175 +++++++++++---------- .../main/java/org/apache/accumulo/fate/Fate.java | 4 +- .../core/file/rfile/MultiThreadedRFileTest.java | 4 +- .../accumulo/server/problems/ProblemReports.java | 3 +- .../manager/upgrade/UpgradeCoordinator.java | 4 +- .../org/apache/accumulo/tserver/TabletServer.java | 6 +- .../tserver/TabletServerResourceManager.java | 50 +++--- .../compactions/InternalCompactionExecutor.java | 3 +- 9 files changed, 125 insertions(+), 131 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java index f8d3912..99485c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.OptionalInt; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; @@ -65,9 +64,9 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> { private ScannerImpl.Reporter reporter; - private static ThreadPoolExecutor readaheadPool = ThreadPools.createThreadPool(0, - Integer.MAX_VALUE, 3L, TimeUnit.SECONDS, "Accumulo scanner read ahead thread", - new SynchronousQueue<>(), OptionalInt.empty(), true); + private static ThreadPoolExecutor readaheadPool = + ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, TimeUnit.SECONDS, + "Accumulo scanner read ahead thread", new SynchronousQueue<>(), true); private boolean closed = false; diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 25807f6..ac4ed3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -21,7 +21,6 @@ package org.apache.accumulo.core.util.threads; import java.util.OptionalInt; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; @@ -110,7 +109,7 @@ public class ThreadPools { * if property is not handled */ @SuppressWarnings("deprecation") - public static ExecutorService createExecutorService(final AccumuloConfiguration conf, + public static ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf, final Property p, boolean emitThreadPoolMetrics) { switch (p) { @@ -129,8 +128,7 @@ public class ThreadPools { int threads = conf.getCount(p); if (threads == 0) { return createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - "GatherTableInformation", new SynchronousQueue<Runnable>(), OptionalInt.empty(), - emitThreadPoolMetrics); + "GatherTableInformation", new SynchronousQueue<>(), emitThreadPoolMetrics); } else { return createFixedThreadPool(threads, "GatherTableInformation", emitThreadPoolMetrics); } @@ -207,7 +205,7 @@ public class ThreadPools { public static ThreadPoolExecutor createFixedThreadPool(int numThreads, final String name, BlockingQueue<Runnable> queue, boolean emitThreadPoolMetrics) { return createThreadPool(numThreads, numThreads, DEFAULT_TIMEOUT_MILLISECS, - TimeUnit.MILLISECONDS, name, queue, OptionalInt.empty(), emitThreadPoolMetrics); + TimeUnit.MILLISECONDS, name, queue, emitThreadPoolMetrics); } /** @@ -232,8 +230,7 @@ public class ThreadPools { */ public static ThreadPoolExecutor createFixedThreadPool(int numThreads, long timeOut, TimeUnit units, final String name, boolean emitThreadPoolMetrics) { - return createThreadPool(numThreads, numThreads, timeOut, units, name, - new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), emitThreadPoolMetrics); + return createThreadPool(numThreads, numThreads, timeOut, units, name, emitThreadPoolMetrics); } /** @@ -261,7 +258,38 @@ public class ThreadPools { public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, TimeUnit units, final String name, boolean emitThreadPoolMetrics) { return createThreadPool(coreThreads, maxThreads, timeOut, units, name, - new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), emitThreadPoolMetrics); + new LinkedBlockingQueue<>(), emitThreadPoolMetrics); + } + + /** + * Create a named thread pool + * + * @param coreThreads + * number of threads + * @param maxThreads + * max number of threads + * @param timeOut + * core thread time out + * @param units + * core thread time out units + * @param name + * thread pool name + * @param queue + * queue to use for tasks + * @param emitThreadPoolMetrics + * When set to true will emit metrics and register the metrics in a static registry. + * After the thread pool is deleted, there will still be metrics objects related to it in + * the static registry. There is no way to clean these left over objects up therefore its + * recommended that this option only be set true for long lived thread pools. Creating + * lots of short lived thread pools and registering them can lead to out of memory errors + * over long time periods. + * @return ThreadPoolExecutor + */ + public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, + TimeUnit units, final String name, BlockingQueue<Runnable> queue, + boolean emitThreadPoolMetrics) { + return createThreadPool(coreThreads, maxThreads, timeOut, units, name, queue, + OptionalInt.empty(), emitThreadPoolMetrics); } /** @@ -293,8 +321,8 @@ public class ThreadPools { public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut, TimeUnit units, final String name, BlockingQueue<Runnable> queue, OptionalInt priority, boolean emitThreadPoolMetrics) { - ThreadPoolExecutor result = new ThreadPoolExecutor(coreThreads, maxThreads, timeOut, units, - queue, new NamedThreadFactory(name, priority)) { + var result = new ThreadPoolExecutor(coreThreads, maxThreads, timeOut, units, queue, + new NamedThreadFactory(name, priority)) { @Override public void execute(Runnable arg0) { @@ -358,83 +386,58 @@ public class ThreadPools { */ public static ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, final String name, boolean emitThreadPoolMetrics) { - return createScheduledExecutorService(numThreads, name, OptionalInt.empty(), - emitThreadPoolMetrics); - } + var result = new ScheduledThreadPoolExecutor(numThreads, new NamedThreadFactory(name)) { - /** - * Create a named ScheduledThreadPool - * - * @param numThreads - * number of threads - * @param name - * thread pool name - * @param priority - * thread priority - * @param emitThreadPoolMetrics - * When set to true will emit metrics and register the metrics in a static registry. - * After the thread pool is deleted, there will still be metrics objects related to it in - * the static registry. There is no way to clean these left over objects up therefore its - * recommended that this option only be set true for long lived thread pools. Creating - * lots of short lived thread pools and registering them can lead to out of memory errors - * over long time periods. - * @return ScheduledThreadPoolExecutor - */ - private static ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, - final String name, OptionalInt priority, boolean emitThreadPoolMetrics) { - ScheduledThreadPoolExecutor result = - new ScheduledThreadPoolExecutor(numThreads, new NamedThreadFactory(name, priority)) { - - @Override - public void execute(Runnable command) { - super.execute(Context.current().wrap(command)); - } - - @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { - return super.schedule(Context.current().wrap(callable), delay, unit); - } - - @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { - return super.schedule(Context.current().wrap(command), delay, unit); - } - - @Override - public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, - long period, TimeUnit unit) { - return super.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, - unit); - } - - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, - long delay, TimeUnit unit) { - return super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, - delay, unit); - } - - @Override - public <T> Future<T> submit(Callable<T> task) { - return super.submit(Context.current().wrap(task)); - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - return super.submit(Context.current().wrap(task), result); - } - - @Override - public Future<?> submit(Runnable task) { - return super.submit(Context.current().wrap(task)); - } - - @Override - public boolean remove(Runnable task) { - return super.remove(Context.current().wrap(task)); - } - - }; + @Override + public void execute(Runnable command) { + super.execute(Context.current().wrap(command)); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return super.schedule(Context.current().wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(Context.current().wrap(command), delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, + long period, TimeUnit unit) { + return super.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, + unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, + long delay, TimeUnit unit) { + return super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, + unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return super.submit(Context.current().wrap(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return super.submit(Context.current().wrap(task), result); + } + + @Override + public Future<?> submit(Runnable task) { + return super.submit(Context.current().wrap(task)); + } + + @Override + public boolean remove(Runnable task) { + return super.remove(Context.current().wrap(task)); + } + + }; if (emitThreadPoolMetrics) { MetricsUtil.addExecutorServiceMetrics(result, name); } diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java index 6f0fde2..54ef3e4 100644 --- a/core/src/main/java/org/apache/accumulo/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java @@ -229,8 +229,8 @@ public class Fate<T> { * Launches the specified number of worker threads. */ public void startTransactionRunners(AccumuloConfiguration conf) { - final ThreadPoolExecutor pool = (ThreadPoolExecutor) ThreadPools.createExecutorService(conf, - Property.MANAGER_FATE_THREADPOOL_SIZE, true); + final ThreadPoolExecutor pool = + ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf); fatePoolWatcher.schedule(() -> { // resize the pool if the property changed diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 87dcf0e..c91216f 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -34,8 +34,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.OptionalInt; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -241,7 +239,7 @@ public class MultiThreadedRFileTest { int maxThreads = 10; String name = "MultiThreadedRFileTestThread"; ThreadPoolExecutor pool = ThreadPools.createThreadPool(maxThreads + 1, maxThreads + 1, 5 * 60, - TimeUnit.SECONDS, name, new LinkedBlockingQueue<>(), OptionalInt.empty(), false); + TimeUnit.SECONDS, name, false); try { Runnable runnable = () -> { try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java index 906fc00..08dc7ab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.OptionalInt; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -68,7 +67,7 @@ public class ProblemReports implements Iterable<ProblemReport> { * is reporting lots of problems, but problem reports can not be processed */ private ExecutorService reportExecutor = ThreadPools.createThreadPool(0, 1, 60, TimeUnit.SECONDS, - "acu-problem-reporter", new LinkedBlockingQueue<>(500), OptionalInt.empty(), false); + "acu-problem-reporter", new LinkedBlockingQueue<>(500), false); private final ServerContext context; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index d464b1e..9d3b4ad 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -20,7 +20,6 @@ package org.apache.accumulo.manager.upgrade; import java.io.IOException; import java.util.Map; -import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; @@ -176,8 +175,7 @@ public class UpgradeCoordinator { if (currentVersion < AccumuloDataVersion.get()) { return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - "UpgradeMetadataThreads", new SynchronousQueue<Runnable>(), OptionalInt.empty(), false) - .submit(() -> { + "UpgradeMetadataThreads", new SynchronousQueue<>(), false).submit(() -> { try { for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) { log.info("Upgrading Root from data version {}", v); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 84d106e..c852c1b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -767,8 +767,8 @@ public class TabletServer extends AbstractServer { throw new RuntimeException(e); } - ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools - .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true); + ThreadPoolExecutor distWorkQThreadPool = + ThreadPools.createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true); bulkFailedCopyQ = new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, @@ -939,7 +939,7 @@ public class TabletServer extends AbstractServer { } // Start the pool to handle outgoing replications - final ThreadPoolExecutor replicationThreadPool = (ThreadPoolExecutor) ThreadPools + final ThreadPoolExecutor replicationThreadPool = ThreadPools .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS, false); replWorker.setExecutor(replicationThreadPool); replWorker.run(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 7f63210..31432e9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -93,18 +93,18 @@ public class TabletServerResourceManager { private static final Logger log = LoggerFactory.getLogger(TabletServerResourceManager.class); - private final ExecutorService minorCompactionThreadPool; - private final ExecutorService splitThreadPool; - private final ExecutorService defaultSplitThreadPool; - private final ExecutorService defaultMigrationPool; - private final ExecutorService migrationPool; - private final ExecutorService assignmentPool; - private final ExecutorService assignMetaDataPool; - private final ExecutorService summaryRetrievalPool; - private final ExecutorService summaryPartitionPool; - private final ExecutorService summaryRemotePool; - - private final Map<String,ExecutorService> scanExecutors; + private final ThreadPoolExecutor minorCompactionThreadPool; + private final ThreadPoolExecutor splitThreadPool; + private final ThreadPoolExecutor defaultSplitThreadPool; + private final ThreadPoolExecutor defaultMigrationPool; + private final ThreadPoolExecutor migrationPool; + private final ThreadPoolExecutor assignmentPool; + private final ThreadPoolExecutor assignMetaDataPool; + private final ThreadPoolExecutor summaryRetrievalPool; + private final ThreadPoolExecutor summaryPartitionPool; + private final ThreadPoolExecutor summaryRemotePool; + + private final Map<String,ThreadPoolExecutor> scanExecutors; private final Map<String,ScanExecutor> scanExecutorChoices; private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments; @@ -136,12 +136,11 @@ public class TabletServerResourceManager { */ private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) { - context.getScheduledExecutor().scheduleWithFixedDelay(() -> { - ThreadPools.resizePool(tp, maxThreads, name); - }, 1000, 10_000, TimeUnit.MILLISECONDS); + context.getScheduledExecutor().scheduleWithFixedDelay( + () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS); } - private ExecutorService createPriorityExecutor(ScanExecutorConfig sec, + private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, Map<String,Queue<Runnable>> scanExecQueues) { BlockingQueue<Runnable> queue; @@ -186,11 +185,10 @@ public class TabletServerResourceManager { scanExecQueues.put(sec.name, queue); - ExecutorService es = + ThreadPoolExecutor es = ThreadPools.createThreadPool(sec.getCurrentMaxThreads(), sec.getCurrentMaxThreads(), 0L, TimeUnit.MILLISECONDS, "scan-" + sec.name, queue, sec.priority, true); - modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, - (ThreadPoolExecutor) es); + modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, es); return es; } @@ -307,7 +305,7 @@ public class TabletServerResourceManager { ThreadPools.createExecutorService(acuConf, Property.TSERV_MINC_MAXCONCURRENT, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), - "minor compactor", (ThreadPoolExecutor) minorCompactionThreadPool); + "minor compactor", minorCompactionThreadPool); splitThreadPool = ThreadPools.createThreadPool(0, 1, 1, TimeUnit.SECONDS, "splitter", true); @@ -321,7 +319,7 @@ public class TabletServerResourceManager { ThreadPools.createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), - "tablet migration", (ThreadPoolExecutor) migrationPool); + "tablet migration", migrationPool); // not sure if concurrent assignments can run safely... even if they could there is probably no // benefit at startup because @@ -332,7 +330,7 @@ public class TabletServerResourceManager { ThreadPools.createExecutorService(acuConf, Property.TSERV_ASSIGNMENT_MAXCONCURRENT, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT), - "tablet assignment", (ThreadPoolExecutor) assignmentPool); + "tablet assignment", assignmentPool); assignMetaDataPool = ThreadPools.createThreadPool(0, 1, 60, TimeUnit.SECONDS, "metadata tablet assignment", true); @@ -343,19 +341,19 @@ public class TabletServerResourceManager { ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_RETRIEVAL_THREADS, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS), - "summary file retriever", (ThreadPoolExecutor) summaryRetrievalPool); + "summary file retriever", summaryRetrievalPool); summaryRemotePool = ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_REMOTE_THREADS, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS), - "summary remote", (ThreadPoolExecutor) summaryRemotePool); + "summary remote", summaryRemotePool); summaryPartitionPool = ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_PARTITION_THREADS, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS), - "summary partition", (ThreadPoolExecutor) summaryPartitionPool); + "summary partition", summaryPartitionPool); Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors(); Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>(); @@ -805,7 +803,7 @@ public class TabletServerResourceManager { ScanDispatch prefs = dispatcher.dispatch(params); scanInfo.scanParams.setScanDispatch(prefs); - ExecutorService executor = scanExecutors.get(prefs.getExecutorName()); + ThreadPoolExecutor executor = scanExecutors.get(prefs.getExecutorName()); if (executor == null) { log.warn( "For table id {}, {} dispatched to non-existent executor {} Using default executor.", diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index 9e64cff..b964b2a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -161,7 +160,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { queue = new PriorityBlockingQueue<Runnable>(100, comparator); threadPool = ThreadPools.createThreadPool(threads, threads, 60, TimeUnit.SECONDS, - "compaction." + ceid, queue, OptionalInt.empty(), false); + "compaction." + ceid, queue, false); metricCloser = ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size());