This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 2f4b0372512b3cf677e7a66d4b5ee8482a36a0da
Merge: ce83488245 0ad96b1dc0
Author: Ed Coleman <edcole...@apache.org>
AuthorDate: Tue Mar 19 20:31:36 2024 +0000

    Merge remote-tracking branch 'upstream/main' into elasticity

 .../accumulo/core/clientImpl/ClientContext.java    |   9 +-
 .../core/clientImpl/ConditionalWriterImpl.java     |   2 +-
 .../core/clientImpl/InstanceOperationsImpl.java    |   4 +-
 .../core/clientImpl/TableOperationsImpl.java       |   4 +-
 .../core/clientImpl/TabletServerBatchReader.java   |   5 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |   8 +-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |   8 +-
 .../accumulo/core/file/BloomFilterLayer.java       |   5 +-
 .../file/blockfile/cache/lru/LruBlockCache.java    |   4 +-
 .../blockfile/cache/tinylfu/TinyLfuBlockCache.java |   2 +-
 .../util/compaction/ExternalCompactionUtil.java    |   9 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 379 +++++++++++++--------
 .../core/file/rfile/MultiThreadedRFileTest.java    |   7 +-
 .../threads/ThreadPoolExecutorBuilderTest.java     |  79 +++++
 .../server/conf/ServerConfigurationFactory.java    |   4 +-
 .../conf/store/impl/PropCacheCaffeineImpl.java     |   7 +-
 .../server/conf/store/impl/PropStoreWatcher.java   |   5 +-
 .../accumulo/server/fs/VolumeManagerImpl.java      |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   7 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   |   7 +-
 .../server/util/RemoveEntriesForMissingFiles.java  |   4 +-
 .../server/util/VerifyTabletAssignments.java       |   5 +-
 .../server/conf/store/impl/ReadyMonitorTest.java   |   4 +-
 .../main/java/org/apache/accumulo/gc/GCRun.java    |   2 +-
 .../compaction/coordinator/QueueMetrics.java       |   4 +-
 .../accumulo/manager/metrics/fate/FateMetrics.java |   4 +-
 .../accumulo/manager/recovery/RecoveryManager.java |   4 +-
 .../apache/accumulo/manager/split/Splitter.java    |   5 +-
 .../manager/tableOps/bulkVer2/TabletRefresher.java |   2 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   8 +-
 .../tserver/TabletServerResourceManager.java       |  21 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |   5 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   4 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |   5 +-
 .../test/functional/BatchWriterFlushIT.java        |   2 +-
 35 files changed, 409 insertions(+), 229 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index c009e14b14,02aeef6bf6..bf650a077f
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@@ -372,8 -372,8 +372,8 @@@ public class ConditionalWriterImpl impl
      this.auths = config.getAuthorizations();
      this.ve = new VisibilityEvaluator(config.getAuthorizations());
      this.threadPool = context.threadPools().createScheduledExecutorService(
-         config.getMaxWriteThreads(), this.getClass().getSimpleName(), false);
+         config.getMaxWriteThreads(), this.getClass().getSimpleName());
 -    this.locator = new SyncingTabletLocator(context, tableId);
 +    this.locator = new SyncingClientTabletCache(context, tableId);
      this.serverQueues = new HashMap<>();
      this.tableId = tableId;
      this.tableName = tableName;
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 297ed70495,6b94fd81cf..2ea43612f1
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -448,74 -422,109 +448,76 @@@ public class TableOperationsImpl extend
      }
    }
  
 -  private static class SplitEnv {
 -    private final String tableName;
 -    private final TableId tableId;
 -    private final ExecutorService executor;
 -    private final CountDownLatch latch;
 -    private final AtomicReference<Exception> exception;
 -
 -    SplitEnv(String tableName, TableId tableId, ExecutorService executor, 
CountDownLatch latch,
 -        AtomicReference<Exception> exception) {
 -      this.tableName = tableName;
 -      this.tableId = tableId;
 -      this.executor = executor;
 -      this.latch = latch;
 -      this.exception = exception;
 -    }
 -  }
 -
 -  private class SplitTask implements Runnable {
 -
 -    private List<Text> splits;
 -    private SplitEnv env;
 +  /**
 +   * On the server side the fate operation will exit w/o an error if the 
tablet requested to split
 +   * does not exist. When this happens it will also return an empty string. 
In the case where the
 +   * fate operation successfully splits the tablet it will return the 
following string. This code
 +   * uses this return value to see if it needs to retry finding the tablet.
 +   */
 +  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
  
 -    SplitTask(SplitEnv env, List<Text> splits) {
 -      this.env = env;
 -      this.splits = splits;
 -    }
 +  @Override
 +  public void addSplits(String tableName, SortedSet<Text> splits)
 +      throws AccumuloException, TableNotFoundException, 
AccumuloSecurityException {
  
 -    @Override
 -    public void run() {
 -      try {
 -        if (env.exception.get() != null) {
 -          return;
 -        }
 +    EXISTING_TABLE_NAME.validate(tableName);
  
 -        if (splits.size() <= 2) {
 -          addSplits(env, new TreeSet<>(splits));
 -          splits.forEach(s -> env.latch.countDown());
 -          return;
 -        }
 +    TableId tableId = context.getTableId(tableName);
  
 -        int mid = splits.size() / 2;
 +    // TODO should there be a server side check for this?
 +    context.requireNotOffline(tableId, tableName);
  
 -        // split the middle split point to ensure that child task split
 -        // different tablets and can therefore run in parallel
 -        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
 -        env.latch.countDown();
 +    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
tableId);
  
 -        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
 -        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, 
splits.size())));
 +    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
-     ExecutorService executor = 
context.threadPools().createFixedThreadPool(16, "addSplits", false);
++    ExecutorService executor =
++        
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
+ 
 -      } catch (Exception t) {
 -        env.exception.compareAndSet(null, t);
 -      }
 -    }
 +    try {
 +      while (!splitsTodo.isEmpty()) {
  
 -  }
 +        tabLocator.invalidateCache();
  
 -  @Override
 -  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
 -      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
 -    EXISTING_TABLE_NAME.validate(tableName);
 +        Map<KeyExtent,List<Text>> tabletSplits =
 +            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
  
 -    TableId tableId = context.getTableId(tableName);
 -    List<Text> splits = new ArrayList<>(partitionKeys);
 +        List<Future<List<Text>>> splitTasks = new ArrayList<>();
  
 -    // should be sorted because we copied from a sorted set, but that makes
 -    // assumptions about how the copy was done so resort to be sure.
 -    Collections.sort(splits);
 -    CountDownLatch latch = new CountDownLatch(splits.size());
 -    AtomicReference<Exception> exception = new AtomicReference<>(null);
 +        for (Entry<KeyExtent,List<Text>> splitsForTablet : 
tabletSplits.entrySet()) {
 +          Callable<List<Text>> splitTask = createSplitTask(tableName, 
splitsForTablet);
 +          splitTasks.add(executor.submit(splitTask));
 +        }
  
 -    ExecutorService executor =
 -        
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
 -    try {
 -      executor.execute(
 -          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, 
exception), splits));
 -
 -      while (!latch.await(100, MILLISECONDS)) {
 -        if (exception.get() != null) {
 -          executor.shutdownNow();
 -          Throwable excep = exception.get();
 -          // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows what
 -          // code path got them here. If the wrapping was not done, the
 -          // user would only have the stack trace for the background thread.
 -          if (excep instanceof TableNotFoundException) {
 -            TableNotFoundException tnfe = (TableNotFoundException) excep;
 -            throw new TableNotFoundException(tableId.canonical(), tableName,
 -                "Table not found by background thread", tnfe);
 -          } else if (excep instanceof TableOfflineException) {
 -            log.debug("TableOfflineException occurred in background thread. 
Throwing new exception",
 -                excep);
 -            throw new TableOfflineException(tableId, tableName);
 -          } else if (excep instanceof AccumuloSecurityException) {
 -            // base == background accumulo security exception
 -            AccumuloSecurityException base = (AccumuloSecurityException) 
excep;
 -            throw new AccumuloSecurityException(base.getUser(), 
base.asThriftException().getCode(),
 -                base.getTableInfo(), excep);
 -          } else if (excep instanceof AccumuloServerException) {
 -            throw new AccumuloServerException((AccumuloServerException) 
excep);
 -          } else if (excep instanceof Error) {
 -            throw new Error(excep);
 -          } else {
 -            throw new AccumuloException(excep);
 +        for (var future : splitTasks) {
 +          try {
 +            var completedSplits = future.get();
 +            completedSplits.forEach(splitsTodo::remove);
 +          } catch (ExecutionException ee) {
 +            Throwable excep = ee.getCause();
 +            // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows
 +            // what code path got them here. If the wrapping was not done, 
the user would only
 +            // have the stack trace for the background thread.
 +            if (excep instanceof TableNotFoundException) {
 +              TableNotFoundException tnfe = (TableNotFoundException) excep;
 +              throw new TableNotFoundException(tableId.canonical(), tableName,
 +                  "Table not found by background thread", tnfe);
 +            } else if (excep instanceof TableOfflineException) {
 +              log.debug(
 +                  "TableOfflineException occurred in background thread. 
Throwing new exception",
 +                  excep);
 +              throw new TableOfflineException(tableId, tableName);
 +            } else if (excep instanceof AccumuloSecurityException) {
 +              // base == background accumulo security exception
 +              AccumuloSecurityException base = (AccumuloSecurityException) 
excep;
 +              throw new AccumuloSecurityException(base.getUser(),
 +                  base.asThriftException().getCode(), base.getTableInfo(), 
excep);
 +            } else if (excep instanceof AccumuloServerException) {
 +              throw new AccumuloServerException((AccumuloServerException) 
excep);
 +            } else {
 +              throw new AccumuloException(excep);
 +            }
 +          } catch (InterruptedException e) {
 +            throw new IllegalStateException(e);
            }
          }
        }
diff --cc 
core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 3656b99815,46a07682bd..48bf8cc999
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@@ -63,15 -62,15 +63,15 @@@ public final class TinyLfuBlockCache im
    private final Policy.Eviction<String,Block> policy;
    private final int maxSize;
    private final ScheduledExecutorService statsExecutor = 
ThreadPools.getServerThreadPools()
-       .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", 
false);
+       .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor");
  
    public TinyLfuBlockCache(Configuration conf, CacheType type) {
 -    cache = Caffeine.newBuilder()
 +    cache = 
Caches.getInstance().createNewBuilder(CacheName.TINYLFU_BLOCK_CACHE, false)
          .initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize(type) / 
conf.getBlockSize()))
 -        .weigher((String blockName, Block block) -> {
 +        .recordStats().weigher((String blockName, Block block) -> {
            int keyWeight = ClassSize.align(blockName.length()) + 
ClassSize.STRING;
            return keyWeight + block.weight();
 -        }).maximumWeight(conf.getMaxSize(type)).recordStats().build();
 +        }).maximumWeight(conf.getMaxSize(type)).build();
      policy = cache.policy().eviction().orElseThrow();
      maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum());
      ScheduledFuture<?> future = 
statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC,
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 0d42f659c8,35c358b7ed..86181247a8
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -218,12 -218,11 +218,12 @@@ public class ExternalCompactionUtil 
     */
    public static List<RunningCompaction> 
getCompactionsRunningOnCompactors(ClientContext context) {
      final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
-     final ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(16,
-         "CompactorRunningCompactions", false);
+     final ExecutorService executor = ThreadPools.getServerThreadPools()
+         
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
 -    getCompactorAddrs(context).forEach((q, hp) -> {
 +
 +    getCompactorAddrs(context).forEach((group, hp) -> {
        hp.forEach(hostAndPort -> {
 -        rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
 +        rcFutures.add(new RunningCompactionFuture(group, hostAndPort,
              executor.submit(() -> getRunningCompaction(hostAndPort, 
context))));
        });
      });
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index ad5aa4710b,334fb46a53..d520c01d87
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@@ -251,35 -269,74 +269,68 @@@ public class ThreadPools 
          return createScheduledExecutorService(conf.getCount(p), 
"GeneralExecutor",
              emitThreadPoolMetrics);
        case MANAGER_FATE_THREADPOOL_SIZE:
-         return createFixedThreadPool(conf.getCount(p), "Repo Runner", 
emitThreadPoolMetrics);
+         builder = getPoolBuilder("Repo 
Runner").numCoreThreads(conf.getCount(p));
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case MANAGER_STATUS_THREAD_POOL_SIZE:
+         builder = getPoolBuilder("GatherTableInformation");
          int threads = conf.getCount(p);
          if (threads == 0) {
-           return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, 
"GatherTableInformation",
-               new SynchronousQueue<>(), emitThreadPoolMetrics);
+           
builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, 
SECONDS)
+               .withQueue(new SynchronousQueue<>());
          } else {
-           return createFixedThreadPool(threads, "GatherTableInformation", 
emitThreadPoolMetrics);
+           builder.numCoreThreads(threads);
          }
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
 -      case TSERV_WORKQ_THREADS:
 -        builder = getPoolBuilder("distributed work 
queue").numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
        case TSERV_MINC_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"minor compactor",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("minor 
compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L,
+             MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_MIGRATE_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet migration",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("tablet 
migration").numCoreThreads(conf.getCount(p))
+             .withTimeOut(0L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_ASSIGNMENT_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet assignment",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("tablet 
assignment").numCoreThreads(conf.getCount(p))
+             .withTimeOut(0L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_RETRIEVAL_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-             "summary file retriever", emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary file 
retriever").numCoreThreads(conf.getCount(p))
+             .withTimeOut(60L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_REMOTE_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS, "summary remote",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary 
remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L,
+             MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_PARTITION_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-             "summary partition", emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary 
partition").numCoreThreads(conf.getCount(p))
+             .withTimeOut(60L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case GC_DELETE_THREADS:
-         return createFixedThreadPool(conf.getCount(p), "deleting", 
emitThreadPoolMetrics);
+         return 
getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build();
        default:
          throw new IllegalArgumentException("Unhandled thread pool property: " 
+ p);
      }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 7607d5ceb9,40cdb5811f..f56e7f1622
--- 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@@ -324,10 -321,10 +324,10 @@@ public class VolumeManagerImpl implemen
  
    @Override
    public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String 
poolName,
 -      String transactionId) throws IOException {
 +      FateId fateId) throws IOException {
      List<Future<Void>> results = new ArrayList<>();
-     ExecutorService workerPool =
-         ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, 
poolName, false);
+     ExecutorService workerPool = 
ThreadPools.getServerThreadPools().getPoolBuilder(poolName)
+         .numCoreThreads(poolSize).build();
      oldToNewPathMap.forEach((oldPath, newPath) -> 
results.add(workerPool.submit(() -> {
        boolean success;
        try {
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index c5ba5157dc,0000000000..425bdf4937
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@@ -1,155 -1,0 +1,155 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
 +import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
 +
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import 
org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Sets;
 +import com.google.common.collect.Sets.SetView;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +import io.micrometer.core.instrument.Tags;
 +
 +public class QueueMetrics implements MetricsProducer {
 +
 +  private static class QueueMeters {
 +    private final Gauge length;
 +    private final Gauge jobsQueued;
 +    private final Gauge jobsDequeued;
 +    private final Gauge jobsRejected;
 +    private final Gauge jobsLowestPriority;
 +
 +    public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid,
 +        CompactionJobPriorityQueue queue) {
 +      var queueId = formatString(cgid.canonical());
 +
 +      length =
 +          Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, queue, q 
-> q.getMaxSize())
 +              .description("Length of priority queues")
 +              .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
 +
 +      jobsQueued = Gauge
 +          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, queue, q 
-> q.getQueuedJobs())
 +          .description("Count of queued jobs")
 +          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
 +
 +      jobsDequeued = Gauge
 +          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, queue,
 +              q -> q.getDequeuedJobs())
 +          .description("Count of jobs dequeued")
 +          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
 +
 +      jobsRejected = Gauge
 +          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, queue,
 +              q -> q.getRejectedJobs())
 +          .description("Count of rejected jobs")
 +          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
 +
 +      jobsLowestPriority = Gauge
 +          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, queue,
 +              q -> q.getLowestPriority())
 +          .description("Lowest priority queued job")
 +          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
 +    }
 +
 +    private void removeMeters(MeterRegistry registry) {
 +      registry.remove(length);
 +      registry.remove(jobsQueued);
 +      registry.remove(jobsDequeued);
 +      registry.remove(jobsRejected);
 +      registry.remove(jobsLowestPriority);
 +    }
 +  }
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(QueueMetrics.class);
 +  private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(5);
 +  private volatile MeterRegistry meterRegistry = null;
 +  private final CompactionJobQueues compactionJobQueues;
 +  private final Map<CompactorGroupId,QueueMeters> perQueueMetrics = new 
HashMap<>();
 +  private Gauge queueCountMeter = null;
 +
 +  public QueueMetrics(CompactionJobQueues compactionJobQueues) {
 +    this.compactionJobQueues = compactionJobQueues;
-     ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
-         .createScheduledExecutorService(1, "queueMetricsPoller", false);
++    ScheduledExecutorService scheduler =
++        ThreadPools.getServerThreadPools().createScheduledExecutorService(1, 
"queueMetricsPoller");
 +    Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 +    
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
 +        DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY, 
TimeUnit.MILLISECONDS));
 +  }
 +
 +  public void update() {
 +
 +    // read the volatile variable once so the rest of the method has 
consistent view
 +    var localRegistry = meterRegistry;
 +
 +    if (queueCountMeter == null) {
 +      queueCountMeter = Gauge
 +          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
 +              CompactionJobQueues::getQueueCount)
 +          .description("Number of current 
Queues").tags(getCommonTags()).register(localRegistry);
 +    }
 +    LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds());
 +
 +    Set<CompactorGroupId> definedQueues = compactionJobQueues.getQueueIds();
 +    LOG.debug("update - defined queues: {}", definedQueues);
 +
 +    // Copy the keySet into a new Set so that changes to perQueueMetrics
 +    // don't affect the collection
 +    Set<CompactorGroupId> queuesWithMetrics = new 
HashSet<>(perQueueMetrics.keySet());
 +    LOG.debug("update - queues with metrics: {}", queuesWithMetrics);
 +
 +    SetView<CompactorGroupId> queuesWithoutMetrics =
 +        Sets.difference(definedQueues, queuesWithMetrics);
 +    queuesWithoutMetrics.forEach(q -> {
 +      LOG.debug("update - creating meters for queue: {}", q);
 +      perQueueMetrics.put(q, new QueueMeters(localRegistry, q, 
compactionJobQueues.getQueue(q)));
 +    });
 +
 +    SetView<CompactorGroupId> metricsWithoutQueues =
 +        Sets.difference(queuesWithMetrics, definedQueues);
 +    metricsWithoutQueues.forEach(q -> {
 +      LOG.debug("update - removing meters for queue: {}", q);
 +      perQueueMetrics.get(q).removeMeters(localRegistry);
 +      perQueueMetrics.remove(q);
 +    });
 +
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    this.meterRegistry = registry;
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 69d485dd91,cb590a3642..d0eeb9b58d
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@@ -69,13 -69,12 +69,13 @@@ public class RecoveryManager 
  
    public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) {
      this.manager = manager;
 -    existenceCache =
 -        Caffeine.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, 
TimeUnit.MILLISECONDS)
 -            .maximumWeight(10_000_000).weigher((path, exist) -> 
path.toString().length()).build();
 +    existenceCache = this.manager.getContext().getCaches()
 +        .createNewBuilder(CacheName.RECOVERY_MANAGER_PATH_CACHE, true)
 +        .expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS)
 +        .maximumWeight(10_000_000).weigher((path, exist) -> 
path.toString().length()).build();
  
-     executor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(4,
-         "Walog sort starter", false);
+     executor =
+         ThreadPools.getServerThreadPools().createScheduledExecutorService(4, 
"Walog sort starter");
      zooCache = new ZooCache(manager.getContext().getZooReader(), null);
      try {
        List<String> workIDs =
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 3acdfe13bf,0000000000..d8e0a87a40
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@@ -1,120 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.split;
 +
 +import java.util.Objects;
 +import java.util.Set;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.metadata.TabletFile;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.util.FileUtil;
 +import org.apache.accumulo.server.util.FileUtil.FileInfo;
 +
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +
 +public class Splitter {
 +
 +  private final ThreadPoolExecutor splitExecutor;
 +
 +  private static class CacheKey {
 +
 +    final TableId tableId;
 +    final TabletFile tabletFile;
 +
 +    public CacheKey(TableId tableId, TabletFile tabletFile) {
 +      this.tableId = tableId;
 +      this.tabletFile = tabletFile;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (this == o) {
 +        return true;
 +      }
 +      if (o == null || getClass() != o.getClass()) {
 +        return false;
 +      }
 +      CacheKey cacheKey = (CacheKey) o;
 +      return Objects.equals(tableId, cacheKey.tableId)
 +          && Objects.equals(tabletFile, cacheKey.tabletFile);
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      return Objects.hash(tableId, tabletFile);
 +    }
 +
 +  }
 +
 +  LoadingCache<CacheKey,FileInfo> splitFileCache;
 +
 +  public Splitter(ServerContext context) {
 +    int numThreads = 
context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS);
 +    // Set up thread pool that constrains the amount of task it queues and 
when full discards task.
 +    // The purpose of this is to avoid reading lots of data into memory if 
lots of tablets need to
 +    // split.
 +    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000);
-     this.splitExecutor = context.threadPools().createThreadPool(numThreads, 
numThreads, 0,
-         TimeUnit.MILLISECONDS, "split_seeder", queue, true);
++    this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder")
++        .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, 
TimeUnit.MILLISECONDS)
++        .withQueue(queue).enableThreadPoolMetrics().build();
 +
 +    // Discard task when the queue is full, this allows the TGW to continue 
processing task other
 +    // than splits.
 +    this.splitExecutor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardPolicy());
 +
 +    Weigher<CacheKey,
 +        FileInfo> weigher = (key, info) -> key.tableId.canonical().length()
 +            + key.tabletFile.getPath().toString().length() + 
info.getFirstRow().getLength()
 +            + info.getLastRow().getLength();
 +
 +    CacheLoader<CacheKey,FileInfo> loader = key -> {
 +      TableConfiguration tableConf = 
context.getTableConfiguration(key.tableId);
 +      return FileUtil.tryToGetFirstAndLastRows(context, tableConf, 
Set.of(key.tabletFile))
 +          .get(key.tabletFile);
 +    };
 +
 +    splitFileCache = 
context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true)
 +        .expireAfterAccess(10, 
TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher)
 +        .build(loader);
 +
 +  }
 +
 +  public synchronized void start() {}
 +
 +  public synchronized void stop() {
 +    splitExecutor.shutdownNow();
 +  }
 +
 +  public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
 +    return splitFileCache.get(new CacheKey(tableId, tabletFile));
 +  }
 +
 +  public void initiateSplit(SeedSplitTask seedSplitTask) {
 +    splitExecutor.execute(seedSplitTask);
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index d2bc8c9b91,0000000000..a5cdbe847f
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@@ -1,190 -1,0 +1,190 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.tableOps.bulkVer2;
 +
 +import static java.util.stream.Collectors.groupingBy;
 +import static java.util.stream.Collectors.mapping;
 +import static java.util.stream.Collectors.toList;
 +
 +import java.time.Duration;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.function.Predicate;
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 +import org.apache.accumulo.core.trace.TraceUtil;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class TabletRefresher {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(TabletRefresher.class);
 +
 +  public static void refresh(ServerContext context,
 +      Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, 
TableId tableId,
 +      byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) 
{
 +
 +    // ELASTICITY_TODO should this thread pool be configurable?
 +    ThreadPoolExecutor threadPool =
-         context.threadPools().createFixedThreadPool(10, "Tablet refresh " + 
fateId, false);
++        context.threadPools().getPoolBuilder("Tablet refresh " + 
fateId).numCoreThreads(10).build();
 +
 +    try (var tablets = context.getAmple().readTablets().forTable(tableId)
 +        .overlapping(startRow, endRow).checkConsistency()
 +        .fetch(ColumnType.LOADED, ColumnType.LOCATION, 
ColumnType.PREV_ROW).build()) {
 +
 +      // Find all tablets that need to refresh their metadata. There may be 
some tablets that were
 +      // hosted after the tablet files were updated, it just results in an 
unneeded refresh
 +      // request. There may also be tablets that had a location when the 
files were set but do not
 +      // have a location now, that is ok the next time that tablet loads 
somewhere it will see the
 +      // files.
 +
 +      var tabletIterator =
 +          tablets.stream().filter(tabletMetadata -> 
tabletMetadata.getLocation() != null)
 +              .filter(needsRefresh).iterator();
 +
 +      // avoid reading all tablets into memory and instead process batches of 
1000 tablets at a time
 +      Iterators.partition(tabletIterator, 1000).forEachRemaining(batch -> {
 +        var refreshesNeeded = 
batch.stream().collect(groupingBy(TabletMetadata::getLocation,
 +            mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), 
toList())));
 +
 +        refreshTablets(threadPool, fateId.canonical(), context, 
onlineTserversSupplier,
 +            refreshesNeeded);
 +      });
 +
 +    } finally {
 +      threadPool.shutdownNow();
 +    }
 +
 +  }
 +
 +  public static void refreshTablets(ExecutorService threadPool, String logId, 
ServerContext context,
 +      Supplier<Set<TServerInstance>> onlineTserversSupplier,
 +      Map<TabletMetadata.Location,List<TKeyExtent>> refreshesNeeded) {
 +
 +    // make a copy as it will be mutated in this method
 +    refreshesNeeded = new HashMap<>(refreshesNeeded);
 +
 +    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
 +        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
 +        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (!refreshesNeeded.isEmpty()) {
 +
 +      Map<TabletMetadata.Location,Future<List<TKeyExtent>>> futures = new 
HashMap<>();
 +
 +      for (Map.Entry<TabletMetadata.Location,List<TKeyExtent>> entry : 
refreshesNeeded.entrySet()) {
 +
 +        // Ask tablet server to reload the metadata for these tablets. The 
tablet server returns
 +        // the list of extents it was hosting but was unable to refresh (the 
tablets could be in
 +        // the process of loading). If it is not currently hosting the tablet 
it treats that as
 +        // refreshed and does not return anything for it.
 +        Future<List<TKeyExtent>> future = threadPool
 +            .submit(() -> sendSyncRefreshRequest(context, logId, 
entry.getKey(), entry.getValue()));
 +
 +        futures.put(entry.getKey(), future);
 +      }
 +
 +      for (Map.Entry<TabletMetadata.Location,Future<List<TKeyExtent>>> entry 
: futures.entrySet()) {
 +        TabletMetadata.Location location = entry.getKey();
 +        Future<List<TKeyExtent>> future = entry.getValue();
 +
 +        List<TKeyExtent> nonRefreshedExtents = null;
 +        try {
 +          nonRefreshedExtents = future.get();
 +        } catch (InterruptedException | ExecutionException e) {
 +          throw new RuntimeException(e);
 +        }
 +        if (nonRefreshedExtents.isEmpty()) {
 +          // tablet server was able to refresh everything, so remove that 
location
 +          refreshesNeeded.remove(location);
 +        } else {
 +          // tablet server could not refresh some tablets, try them again 
later.
 +          refreshesNeeded.put(location, nonRefreshedExtents);
 +        }
 +      }
 +
 +      // look for any tservers that have died since we read the metadata 
table and remove them
 +      if (!refreshesNeeded.isEmpty()) {
 +        Set<TServerInstance> liveTservers = onlineTserversSupplier.get();
 +
 +        refreshesNeeded.keySet()
 +            .removeIf(location -> 
!liveTservers.contains(location.getServerInstance()));
 +      }
 +
 +      if (!refreshesNeeded.isEmpty()) {
 +        try {
 +          retry.waitForNextAttempt(log, logId + " waiting for " + 
refreshesNeeded.size()
 +              + " tservers to refresh their tablets metadata");
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  private static List<TKeyExtent> sendSyncRefreshRequest(ServerContext 
context, String logId,
 +      TabletMetadata.Location location, List<TKeyExtent> refreshes) {
 +    TabletServerClientService.Client client = null;
 +    try {
 +      log.trace("{} sending refresh request to {} for {} extents", logId, 
location,
 +          refreshes.size());
 +      var timeInMillis = 
context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
 +      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
location.getHostAndPort(),
 +          context, timeInMillis);
 +
 +      var unrefreshed = client.refreshTablets(TraceUtil.traceInfo(), 
context.rpcCreds(), refreshes);
 +
 +      log.trace("{} refresh request to {} returned {} unrefreshed extents", 
logId, location,
 +          unrefreshed.size());
 +
 +      return unrefreshed;
 +    } catch (TException ex) {
 +      log.debug("rpc failed server: " + location + ", " + logId + " " + 
ex.getMessage(), ex);
 +
 +      // ELASTICITY_TODO are there any other exceptions we should catch in 
this method and check if
 +      // the tserver is till alive?
 +
 +      // something went wrong w/ RPC return all extents as unrefreshed
 +      return refreshes;
 +    } finally {
 +      ThriftUtil.returnClient(client, context);
 +    }
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index a9ed1fd6be,84e69d561f..22fe50c0c9
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -18,7 -18,7 +18,8 @@@
   */
  package org.apache.accumulo.manager.upgrade;
  
+ import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
  import static 
org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
  import static 
org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index d8ea4d78c5,de47b841fd..0a86efdf7a
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -302,8 -307,17 +305,9 @@@ public class TabletServerResourceManage
          () -> 
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
          "minor compactor", minorCompactionThreadPool);
  
-     defaultMigrationPool = 
ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60,
-         TimeUnit.SECONDS, "metadata tablet migration", true);
 -    splitThreadPool =
 -        
ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0)
 -            .numMaxThreads(1).withTimeOut(1, 
SECONDS).enableThreadPoolMetrics().build();
 -
 -    defaultSplitThreadPool =
 -        ThreadPools.getServerThreadPools().getPoolBuilder("md 
splitter").numCoreThreads(0)
 -            .numMaxThreads(1).withTimeOut(60, 
SECONDS).enableThreadPoolMetrics().build();
 -
 -    defaultMigrationPool = ThreadPools.getServerThreadPools()
 -        .getPoolBuilder("metadata tablet 
migration").numCoreThreads(0).numMaxThreads(1)
 -        .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build();
++    defaultMigrationPool =
++        ThreadPools.getServerThreadPools().getPoolBuilder("metadata tablet 
migration")
++            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, 
SECONDS).build();
  
      migrationPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
          Property.TSERV_MIGRATE_MAXCONCURRENT, true);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index dda829d0a8,2b5a6c6135..030720c93d
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -291,30 -290,13 +291,31 @@@ public class LogSorter 
      }
    }
  
 -  public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
 -    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
 +  /**
 +   * Sort any logs that need sorting in the current thread.
 +   *
 +   * @return The time in millis when the next check can be done.
 +   */
 +  public long sortLogsIfNeeded() throws KeeperException, InterruptedException 
{
 +    DistributedWorkQueue dwq = new DistributedWorkQueue(
 +        context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, 
context);
 +    dwq.processExistingWork(new LogProcessor(), 
MoreExecutors.newDirectExecutorService(), 1, false);
 +    return System.currentTimeMillis() + dwq.getCheckInterval();
 +  }
 +
 +  /**
 +   * Sort any logs that need sorting in a ThreadPool using
 +   * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will 
start a background
 +   * thread to look for log sorting work in the future that will be processed 
by the
 +   * ThreadPoolExecutor
 +   */
 +  public void startWatchingForRecoveryLogs(int threadPoolSize)
 +      throws KeeperException, InterruptedException {
-     ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
-         .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
+     ThreadPoolExecutor threadPool =
+         
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+             .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
 -        context).startProcessing(new LogProcessor(), threadPool);
 +        context).processExistingAndFuture(new LogProcessor(), threadPool);
    }
  
    public List<RecoveryStatus> getLogSorts() {


Reply via email to