This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0fdfee5322278d8833aec541d47bf778489394ac
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Mon Jan 31 04:17:09 2022 -0500

    Minor cleanup of ThreadPools
    
    * Avoid casting to ThreadPoolExecutor by using that type directly
      instead of its superclass
    * Avoid passing unnecessary queue and priority parameters by leveraging
      more of the overloaded createThreadPool methods
---
 .../accumulo/core/clientImpl/ScannerIterator.java  |   7 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 175 +++++++++++----------
 .../main/java/org/apache/accumulo/fate/Fate.java   |   4 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   3 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |   6 +-
 .../tserver/TabletServerResourceManager.java       |  50 +++---
 .../compactions/InternalCompactionExecutor.java    |   3 +-
 9 files changed, 125 insertions(+), 131 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index f8d3912..99485c2 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.OptionalInt;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
@@ -65,9 +64,9 @@ public class ScannerIterator implements 
Iterator<Entry<Key,Value>> {
 
   private ScannerImpl.Reporter reporter;
 
-  private static ThreadPoolExecutor readaheadPool = 
ThreadPools.createThreadPool(0,
-      Integer.MAX_VALUE, 3L, TimeUnit.SECONDS, "Accumulo scanner read ahead 
thread",
-      new SynchronousQueue<>(), OptionalInt.empty(), true);
+  private static ThreadPoolExecutor readaheadPool =
+      ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, TimeUnit.SECONDS,
+          "Accumulo scanner read ahead thread", new SynchronousQueue<>(), 
true);
 
   private boolean closed = false;
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 25807f6..ac4ed3d 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.core.util.threads;
 import java.util.OptionalInt;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
@@ -110,7 +109,7 @@ public class ThreadPools {
    *           if property is not handled
    */
   @SuppressWarnings("deprecation")
-  public static ExecutorService createExecutorService(final 
AccumuloConfiguration conf,
+  public static ThreadPoolExecutor createExecutorService(final 
AccumuloConfiguration conf,
       final Property p, boolean emitThreadPoolMetrics) {
 
     switch (p) {
@@ -129,8 +128,7 @@ public class ThreadPools {
         int threads = conf.getCount(p);
         if (threads == 0) {
           return createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
-              "GatherTableInformation", new SynchronousQueue<Runnable>(), 
OptionalInt.empty(),
-              emitThreadPoolMetrics);
+              "GatherTableInformation", new SynchronousQueue<>(), 
emitThreadPoolMetrics);
         } else {
           return createFixedThreadPool(threads, "GatherTableInformation", 
emitThreadPoolMetrics);
         }
@@ -207,7 +205,7 @@ public class ThreadPools {
   public static ThreadPoolExecutor createFixedThreadPool(int numThreads, final 
String name,
       BlockingQueue<Runnable> queue, boolean emitThreadPoolMetrics) {
     return createThreadPool(numThreads, numThreads, DEFAULT_TIMEOUT_MILLISECS,
-        TimeUnit.MILLISECONDS, name, queue, OptionalInt.empty(), 
emitThreadPoolMetrics);
+        TimeUnit.MILLISECONDS, name, queue, emitThreadPoolMetrics);
   }
 
   /**
@@ -232,8 +230,7 @@ public class ThreadPools {
    */
   public static ThreadPoolExecutor createFixedThreadPool(int numThreads, long 
timeOut,
       TimeUnit units, final String name, boolean emitThreadPoolMetrics) {
-    return createThreadPool(numThreads, numThreads, timeOut, units, name,
-        new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), 
emitThreadPoolMetrics);
+    return createThreadPool(numThreads, numThreads, timeOut, units, name, 
emitThreadPoolMetrics);
   }
 
   /**
@@ -261,7 +258,38 @@ public class ThreadPools {
   public static ThreadPoolExecutor createThreadPool(int coreThreads, int 
maxThreads, long timeOut,
       TimeUnit units, final String name, boolean emitThreadPoolMetrics) {
     return createThreadPool(coreThreads, maxThreads, timeOut, units, name,
-        new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), 
emitThreadPoolMetrics);
+        new LinkedBlockingQueue<>(), emitThreadPoolMetrics);
+  }
+
+  /**
+   * Create a named thread pool
+   *
+   * @param coreThreads
+   *          number of threads
+   * @param maxThreads
+   *          max number of threads
+   * @param timeOut
+   *          core thread time out
+   * @param units
+   *          core thread time out units
+   * @param name
+   *          thread pool name
+   * @param queue
+   *          queue to use for tasks
+   * @param emitThreadPoolMetrics
+   *          When set to true will emit metrics and register the metrics in a 
static registry.
+   *          After the thread pool is deleted, there will still be metrics 
objects related to it in
+   *          the static registry. There is no way to clean these left over 
objects up therefore its
+   *          recommended that this option only be set true for long lived 
thread pools. Creating
+   *          lots of short lived thread pools and registering them can lead 
to out of memory errors
+   *          over long time periods.
+   * @return ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor createThreadPool(int coreThreads, int 
maxThreads, long timeOut,
+      TimeUnit units, final String name, BlockingQueue<Runnable> queue,
+      boolean emitThreadPoolMetrics) {
+    return createThreadPool(coreThreads, maxThreads, timeOut, units, name, 
queue,
+        OptionalInt.empty(), emitThreadPoolMetrics);
   }
 
   /**
@@ -293,8 +321,8 @@ public class ThreadPools {
   public static ThreadPoolExecutor createThreadPool(int coreThreads, int 
maxThreads, long timeOut,
       TimeUnit units, final String name, BlockingQueue<Runnable> queue, 
OptionalInt priority,
       boolean emitThreadPoolMetrics) {
-    ThreadPoolExecutor result = new ThreadPoolExecutor(coreThreads, 
maxThreads, timeOut, units,
-        queue, new NamedThreadFactory(name, priority)) {
+    var result = new ThreadPoolExecutor(coreThreads, maxThreads, timeOut, 
units, queue,
+        new NamedThreadFactory(name, priority)) {
 
       @Override
       public void execute(Runnable arg0) {
@@ -358,83 +386,58 @@ public class ThreadPools {
    */
   public static ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
       final String name, boolean emitThreadPoolMetrics) {
-    return createScheduledExecutorService(numThreads, name, 
OptionalInt.empty(),
-        emitThreadPoolMetrics);
-  }
+    var result = new ScheduledThreadPoolExecutor(numThreads, new 
NamedThreadFactory(name)) {
 
-  /**
-   * Create a named ScheduledThreadPool
-   *
-   * @param numThreads
-   *          number of threads
-   * @param name
-   *          thread pool name
-   * @param priority
-   *          thread priority
-   * @param emitThreadPoolMetrics
-   *          When set to true will emit metrics and register the metrics in a 
static registry.
-   *          After the thread pool is deleted, there will still be metrics 
objects related to it in
-   *          the static registry. There is no way to clean these left over 
objects up therefore its
-   *          recommended that this option only be set true for long lived 
thread pools. Creating
-   *          lots of short lived thread pools and registering them can lead 
to out of memory errors
-   *          over long time periods.
-   * @return ScheduledThreadPoolExecutor
-   */
-  private static ScheduledThreadPoolExecutor 
createScheduledExecutorService(int numThreads,
-      final String name, OptionalInt priority, boolean emitThreadPoolMetrics) {
-    ScheduledThreadPoolExecutor result =
-        new ScheduledThreadPoolExecutor(numThreads, new 
NamedThreadFactory(name, priority)) {
-
-          @Override
-          public void execute(Runnable command) {
-            super.execute(Context.current().wrap(command));
-          }
-
-          @Override
-          public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
-            return super.schedule(Context.current().wrap(callable), delay, 
unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-            return super.schedule(Context.current().wrap(command), delay, 
unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay,
-              long period, TimeUnit unit) {
-            return super.scheduleAtFixedRate(Context.current().wrap(command), 
initialDelay, period,
-                unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
long initialDelay,
-              long delay, TimeUnit unit) {
-            return 
super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay,
-                delay, unit);
-          }
-
-          @Override
-          public <T> Future<T> submit(Callable<T> task) {
-            return super.submit(Context.current().wrap(task));
-          }
-
-          @Override
-          public <T> Future<T> submit(Runnable task, T result) {
-            return super.submit(Context.current().wrap(task), result);
-          }
-
-          @Override
-          public Future<?> submit(Runnable task) {
-            return super.submit(Context.current().wrap(task));
-          }
-
-          @Override
-          public boolean remove(Runnable task) {
-            return super.remove(Context.current().wrap(task));
-          }
-
-        };
+      @Override
+      public void execute(Runnable command) {
+        super.execute(Context.current().wrap(command));
+      }
+
+      @Override
+      public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        return super.schedule(Context.current().wrap(callable), delay, unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+        return super.schedule(Context.current().wrap(command), delay, unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay,
+          long period, TimeUnit unit) {
+        return super.scheduleAtFixedRate(Context.current().wrap(command), 
initialDelay, period,
+            unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay,
+          long delay, TimeUnit unit) {
+        return super.scheduleWithFixedDelay(Context.current().wrap(command), 
initialDelay, delay,
+            unit);
+      }
+
+      @Override
+      public <T> Future<T> submit(Callable<T> task) {
+        return super.submit(Context.current().wrap(task));
+      }
+
+      @Override
+      public <T> Future<T> submit(Runnable task, T result) {
+        return super.submit(Context.current().wrap(task), result);
+      }
+
+      @Override
+      public Future<?> submit(Runnable task) {
+        return super.submit(Context.current().wrap(task));
+      }
+
+      @Override
+      public boolean remove(Runnable task) {
+        return super.remove(Context.current().wrap(task));
+      }
+
+    };
     if (emitThreadPoolMetrics) {
       MetricsUtil.addExecutorServiceMetrics(result, name);
     }
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index 6f0fde2..54ef3e4 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -229,8 +229,8 @@ public class Fate<T> {
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    final ThreadPoolExecutor pool = (ThreadPoolExecutor) 
ThreadPools.createExecutorService(conf,
-        Property.MANAGER_FATE_THREADPOOL_SIZE, true);
+    final ThreadPoolExecutor pool =
+        ThreadPools.createExecutorService(conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
     fatePoolWatcher.schedule(() -> {
       // resize the pool if the property changed
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 87dcf0e..c91216f 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -34,8 +34,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -241,7 +239,7 @@ public class MultiThreadedRFileTest {
       int maxThreads = 10;
       String name = "MultiThreadedRFileTestThread";
       ThreadPoolExecutor pool = ThreadPools.createThreadPool(maxThreads + 1, 
maxThreads + 1, 5 * 60,
-          TimeUnit.SECONDS, name, new LinkedBlockingQueue<>(), 
OptionalInt.empty(), false);
+          TimeUnit.SECONDS, name, false);
       try {
         Runnable runnable = () -> {
           try {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index 906fc00..08dc7ab 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.OptionalInt;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -68,7 +67,7 @@ public class ProblemReports implements 
Iterable<ProblemReport> {
    * is reporting lots of problems, but problem reports can not be processed
    */
   private ExecutorService reportExecutor = ThreadPools.createThreadPool(0, 1, 
60, TimeUnit.SECONDS,
-      "acu-problem-reporter", new LinkedBlockingQueue<>(500), 
OptionalInt.empty(), false);
+      "acu-problem-reporter", new LinkedBlockingQueue<>(500), false);
 
   private final ServerContext context;
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index d464b1e..9d3b4ad 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.manager.upgrade;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.OptionalInt;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
@@ -176,8 +175,7 @@ public class UpgradeCoordinator {
 
     if (currentVersion < AccumuloDataVersion.get()) {
       return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, 
TimeUnit.SECONDS,
-          "UpgradeMetadataThreads", new SynchronousQueue<Runnable>(), 
OptionalInt.empty(), false)
-          .submit(() -> {
+          "UpgradeMetadataThreads", new SynchronousQueue<>(), false).submit(() 
-> {
             try {
               for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) 
{
                 log.info("Upgrading Root from data version {}", v);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 84d106e..c852c1b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -767,8 +767,8 @@ public class TabletServer extends AbstractServer {
       throw new RuntimeException(e);
     }
 
-    ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools
-        .createExecutorService(getConfiguration(), 
Property.TSERV_WORKQ_THREADS, true);
+    ThreadPoolExecutor distWorkQThreadPool =
+        ThreadPools.createExecutorService(getConfiguration(), 
Property.TSERV_WORKQ_THREADS, true);
 
     bulkFailedCopyQ =
         new DistributedWorkQueue(getContext().getZooKeeperRoot() + 
Constants.ZBULK_FAILED_COPYQ,
@@ -939,7 +939,7 @@ public class TabletServer extends AbstractServer {
     }
 
     // Start the pool to handle outgoing replications
-    final ThreadPoolExecutor replicationThreadPool = (ThreadPoolExecutor) 
ThreadPools
+    final ThreadPoolExecutor replicationThreadPool = ThreadPools
         .createExecutorService(getConfiguration(), 
Property.REPLICATION_WORKER_THREADS, false);
     replWorker.setExecutor(replicationThreadPool);
     replWorker.run();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7f63210..31432e9 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -93,18 +93,18 @@ public class TabletServerResourceManager {
 
   private static final Logger log = 
LoggerFactory.getLogger(TabletServerResourceManager.class);
 
-  private final ExecutorService minorCompactionThreadPool;
-  private final ExecutorService splitThreadPool;
-  private final ExecutorService defaultSplitThreadPool;
-  private final ExecutorService defaultMigrationPool;
-  private final ExecutorService migrationPool;
-  private final ExecutorService assignmentPool;
-  private final ExecutorService assignMetaDataPool;
-  private final ExecutorService summaryRetrievalPool;
-  private final ExecutorService summaryPartitionPool;
-  private final ExecutorService summaryRemotePool;
-
-  private final Map<String,ExecutorService> scanExecutors;
+  private final ThreadPoolExecutor minorCompactionThreadPool;
+  private final ThreadPoolExecutor splitThreadPool;
+  private final ThreadPoolExecutor defaultSplitThreadPool;
+  private final ThreadPoolExecutor defaultMigrationPool;
+  private final ThreadPoolExecutor migrationPool;
+  private final ThreadPoolExecutor assignmentPool;
+  private final ThreadPoolExecutor assignMetaDataPool;
+  private final ThreadPoolExecutor summaryRetrievalPool;
+  private final ThreadPoolExecutor summaryPartitionPool;
+  private final ThreadPoolExecutor summaryRemotePool;
+
+  private final Map<String,ThreadPoolExecutor> scanExecutors;
   private final Map<String,ScanExecutor> scanExecutorChoices;
 
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
@@ -136,12 +136,11 @@ public class TabletServerResourceManager {
    */
   private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String 
name,
       final ThreadPoolExecutor tp) {
-    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
-      ThreadPools.resizePool(tp, maxThreads, name);
-    }, 1000, 10_000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(
+        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, 
TimeUnit.SECONDS);
   }
 
-  private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+  private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
       Map<String,Queue<Runnable>> scanExecQueues) {
 
     BlockingQueue<Runnable> queue;
@@ -186,11 +185,10 @@ public class TabletServerResourceManager {
 
     scanExecQueues.put(sec.name, queue);
 
-    ExecutorService es =
+    ThreadPoolExecutor es =
         ThreadPools.createThreadPool(sec.getCurrentMaxThreads(), 
sec.getCurrentMaxThreads(), 0L,
             TimeUnit.MILLISECONDS, "scan-" + sec.name, queue, sec.priority, 
true);
-    modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + 
sec.name,
-        (ThreadPoolExecutor) es);
+    modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + 
sec.name, es);
     return es;
 
   }
@@ -307,7 +305,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_MINC_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
-        "minor compactor", (ThreadPoolExecutor) minorCompactionThreadPool);
+        "minor compactor", minorCompactionThreadPool);
 
     splitThreadPool = ThreadPools.createThreadPool(0, 1, 1, TimeUnit.SECONDS, 
"splitter", true);
 
@@ -321,7 +319,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_MIGRATE_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
-        "tablet migration", (ThreadPoolExecutor) migrationPool);
+        "tablet migration", migrationPool);
 
     // not sure if concurrent assignments can run safely... even if they could 
there is probably no
     // benefit at startup because
@@ -332,7 +330,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_ASSIGNMENT_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
-        "tablet assignment", (ThreadPoolExecutor) assignmentPool);
+        "tablet assignment", assignmentPool);
 
     assignMetaDataPool = ThreadPools.createThreadPool(0, 1, 60, 
TimeUnit.SECONDS,
         "metadata tablet assignment", true);
@@ -343,19 +341,19 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_SUMMARY_RETRIEVAL_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
-        "summary file retriever", (ThreadPoolExecutor) summaryRetrievalPool);
+        "summary file retriever", summaryRetrievalPool);
 
     summaryRemotePool =
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_SUMMARY_REMOTE_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
-        "summary remote", (ThreadPoolExecutor) summaryRemotePool);
+        "summary remote", summaryRemotePool);
 
     summaryPartitionPool =
         ThreadPools.createExecutorService(acuConf, 
Property.TSERV_SUMMARY_PARTITION_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> 
context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
-        "summary partition", (ThreadPoolExecutor) summaryPartitionPool);
+        "summary partition", summaryPartitionPool);
 
     Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
     Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>();
@@ -805,7 +803,7 @@ public class TabletServerResourceManager {
       ScanDispatch prefs = dispatcher.dispatch(params);
       scanInfo.scanParams.setScanDispatch(prefs);
 
-      ExecutorService executor = scanExecutors.get(prefs.getExecutorName());
+      ThreadPoolExecutor executor = scanExecutors.get(prefs.getExecutorName());
       if (executor == null) {
         log.warn(
             "For table id {}, {} dispatched to non-existent executor {} Using 
default executor.",
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index 9e64cff..b964b2a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -161,7 +160,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
     queue = new PriorityBlockingQueue<Runnable>(100, comparator);
 
     threadPool = ThreadPools.createThreadPool(threads, threads, 60, 
TimeUnit.SECONDS,
-        "compaction." + ceid, queue, OptionalInt.empty(), false);
+        "compaction." + ceid, queue, false);
 
     metricCloser =
         ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> 
queuedJob.size());

Reply via email to