This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 7b3eeb9444835cadf657219720d5bb613fb1978f
Merge: 102a9744bb 9ec22b4b03
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Sun Jul 28 23:48:50 2024 +0000

    Merge branch 'main' into elasticity

 .../accumulo/core/clientImpl/ClientContext.java    | 10 +--
 .../core/clientImpl/ConditionalWriterImpl.java     |  3 +-
 .../core/clientImpl/InstanceOperationsImpl.java    |  3 +-
 .../core/clientImpl/TableOperationsImpl.java       |  6 +-
 .../core/clientImpl/TabletServerBatchReader.java   |  8 ++-
 .../core/clientImpl/TabletServerBatchWriter.java   |  8 ++-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  | 12 ++--
 .../accumulo/core/file/BloomFilterLayer.java       |  3 +-
 .../util/compaction/ExternalCompactionUtil.java    |  7 ++-
 .../core/util/threads/ThreadPoolNames.java         | 71 ++++++++++++++++++++++
 .../accumulo/core/util/threads/ThreadPools.java    | 60 +++++++++++++-----
 .../core/file/rfile/MultiThreadedRFileTest.java    |  2 +-
 .../threads/ThreadPoolExecutorBuilderTest.java     | 14 ++---
 .../conf/store/impl/PropCacheCaffeineImpl.java     |  4 +-
 .../server/conf/store/impl/PropStoreWatcher.java   |  2 +-
 .../accumulo/server/problems/ProblemReports.java   |  2 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   | 12 ++--
 .../server/util/RemoveEntriesForMissingFiles.java  |  6 +-
 .../server/util/VerifyTabletAssignments.java       |  6 +-
 .../server/conf/store/impl/ReadyMonitorTest.java   |  2 +-
 .../manager/tableOps/bulkVer2/BulkImportMove.java  |  4 +-
 .../tableOps/tableImport/MoveExportedFiles.java    |  4 +-
 .../manager/upgrade/UpgradeCoordinator.java        |  6 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |  5 +-
 .../tserver/TabletServerResourceManager.java       | 43 ++++++++-----
 .../org/apache/accumulo/tserver/log/LogSorter.java |  3 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |  3 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |  4 +-
 .../test/functional/BatchWriterFlushIT.java        |  2 +-
 29 files changed, 231 insertions(+), 84 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 07ff915c95,62efab71f5..80282318f4
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@@ -24,6 -24,9 +24,8 @@@ import static com.google.common.base.Su
  import static java.nio.charset.StandardCharsets.UTF_8;
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCANNER_READ_AHEAD_POOL;
  
  import java.lang.Thread.UncaughtExceptionHandler;
  import java.net.URL;
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 610e5865fc,00ebbeb363..a4445d48d7
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@@ -377,8 -378,8 +378,8 @@@ public class ConditionalWriterImpl impl
      this.auths = config.getAuthorizations();
      this.accessEvaluator = 
AccessEvaluator.of(config.getAuthorizations().toAccessAuthorizations());
      this.threadPool = context.threadPools().createScheduledExecutorService(
-         config.getMaxWriteThreads(), this.getClass().getSimpleName());
+         config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName);
 -    this.locator = new SyncingTabletLocator(context, tableId);
 +    this.locator = new SyncingClientTabletCache(context, tableId);
      this.serverQueues = new HashMap<>();
      this.tableId = tableId;
      this.tableName = tableName;
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 43b108e1d1,91a1248c97..063c96c238
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -40,7 -30,7 +40,9 @@@ import static org.apache.accumulo.core.
  import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
  import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
 +import static org.apache.accumulo.core.util.Validators.NOT_BUILTIN_TABLE;
++import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_START_POOL;
++import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_WAIT_POOL;
  
  import java.io.BufferedReader;
  import java.io.FileNotFoundException;
@@@ -466,161 -423,192 +468,161 @@@ public class TableOperationsImpl extend
      }
    }
  
 -  private static class SplitEnv {
 -    private final String tableName;
 -    private final TableId tableId;
 -    private final ExecutorService executor;
 -    private final CountDownLatch latch;
 -    private final AtomicReference<Exception> exception;
 +  /**
 +   * On the server side the fate operation will exit w/o an error if the 
tablet requested to split
 +   * does not exist. When this happens it will also return an empty string. 
In the case where the
 +   * fate operation successfully splits the tablet it will return the 
following string. This code
 +   * uses this return value to see if it needs to retry finding the tablet.
 +   */
 +  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
  
 -    SplitEnv(String tableName, TableId tableId, ExecutorService executor, 
CountDownLatch latch,
 -        AtomicReference<Exception> exception) {
 -      this.tableName = tableName;
 -      this.tableId = tableId;
 -      this.executor = executor;
 -      this.latch = latch;
 -      this.exception = exception;
 -    }
 -  }
 +  @Override
 +  public void addSplits(String tableName, SortedSet<Text> splits)
 +      throws AccumuloException, TableNotFoundException, 
AccumuloSecurityException {
  
 -  private class SplitTask implements Runnable {
 +    EXISTING_TABLE_NAME.validate(tableName);
  
 -    private List<Text> splits;
 -    private SplitEnv env;
 +    TableId tableId = context.getTableId(tableName);
  
 -    SplitTask(SplitEnv env, List<Text> splits) {
 -      this.env = env;
 -      this.splits = splits;
 -    }
 +    // TODO should there be a server side check for this?
 +    context.requireNotOffline(tableId, tableName);
  
 -    @Override
 -    public void run() {
 -      try {
 -        if (env.exception.get() != null) {
 -          return;
 -        }
 +    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
tableId);
  
 -        if (splits.size() <= 2) {
 -          addSplits(env, new TreeSet<>(splits));
 -          splits.forEach(s -> env.latch.countDown());
 -          return;
 -        }
 +    SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new 
TreeSet<>(splits));
  
 -        int mid = splits.size() / 2;
 +    final ByteBuffer EMPTY = ByteBuffer.allocate(0);
  
 -        // split the middle split point to ensure that child task split
 -        // different tablets and can therefore run in parallel
 -        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
 -        env.latch.countDown();
 +    ExecutorService startExecutor =
-         
context.threadPools().getPoolBuilder("addSplitsStart").numCoreThreads(16).build();
++        
context.threadPools().getPoolBuilder(SPLIT_START_POOL).numCoreThreads(16).build();
 +    ExecutorService waitExecutor =
-         
context.threadPools().getPoolBuilder("addSplitsWait").numCoreThreads(16).build();
++        
context.threadPools().getPoolBuilder(SPLIT_WAIT_POOL).numCoreThreads(16).build();
  
 -        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
 -        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, 
splits.size())));
 +    while (!splitsTodo.isEmpty()) {
  
 -      } catch (Exception t) {
 -        env.exception.compareAndSet(null, t);
 -      }
 -    }
 +      tabLocator.invalidateCache();
  
 -  }
 +      Map<KeyExtent,List<Text>> tabletSplits =
 +          mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
  
 -  @Override
 -  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
 -      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
 -    EXISTING_TABLE_NAME.validate(tableName);
 +      List<CompletableFuture<Void>> futures = new ArrayList<>();
  
 -    TableId tableId = context.getTableId(tableName);
 -    List<Text> splits = new ArrayList<>(partitionKeys);
 +      // begin the fate operation for each tablet without waiting for the 
operation to complete
 +      for (Entry<KeyExtent,List<Text>> splitsForTablet : 
tabletSplits.entrySet()) {
 +        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
 +          var extent = splitsForTablet.getKey();
  
 -    // should be sorted because we copied from a sorted set, but that makes
 -    // assumptions about how the copy was done so resort to be sure.
 -    Collections.sort(splits);
 -    CountDownLatch latch = new CountDownLatch(splits.size());
 -    AtomicReference<Exception> exception = new AtomicReference<>(null);
 +          List<ByteBuffer> args = new ArrayList<>();
 +          
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
 +          args.add(extent.endRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.endRow()));
 +          args.add(
 +              extent.prevEndRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.prevEndRow()));
 +          splitsForTablet.getValue().forEach(split -> 
args.add(TextUtil.getByteBuffer(split)));
  
 -    ExecutorService executor =
 -        
context.threadPools().getPoolBuilder(SPLIT_POOL).numCoreThreads(16).build();
 -    try {
 -      executor.execute(
 -          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, 
exception), splits));
 -
 -      while (!latch.await(100, MILLISECONDS)) {
 -        if (exception.get() != null) {
 -          executor.shutdownNow();
 -          Throwable excep = exception.get();
 -          // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows what
 -          // code path got them here. If the wrapping was not done, the
 -          // user would only have the stack trace for the background thread.
 -          if (excep instanceof TableNotFoundException) {
 -            TableNotFoundException tnfe = (TableNotFoundException) excep;
 -            throw new TableNotFoundException(tableId.canonical(), tableName,
 -                "Table not found by background thread", tnfe);
 -          } else if (excep instanceof TableOfflineException) {
 -            log.debug("TableOfflineException occurred in background thread. 
Throwing new exception",
 -                excep);
 -            throw new TableOfflineException(tableId, tableName);
 -          } else if (excep instanceof AccumuloSecurityException) {
 -            // base == background accumulo security exception
 -            AccumuloSecurityException base = (AccumuloSecurityException) 
excep;
 -            throw new AccumuloSecurityException(base.getUser(), 
base.asThriftException().getCode(),
 -                base.getTableInfo(), excep);
 -          } else if (excep instanceof AccumuloServerException) {
 -            throw new AccumuloServerException((AccumuloServerException) 
excep);
 -          } else if (excep instanceof Error) {
 -            throw new Error(excep);
 -          } else {
 -            throw new AccumuloException(excep);
 +          try {
 +            return handleFateOperation(() -> {
 +              TFateInstanceType t = 
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
 +              TFateId opid = beginFateOperation(t);
 +              executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, 
Map.of(), false);
 +              return new Pair<>(opid, splitsForTablet.getValue());
 +            }, tableName);
 +          } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException
 +              | AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
 +            // This exception type is used because it makes it easier in the 
foreground thread to do
 +            // exception analysis when using CompletableFuture.
 +            throw new CompletionException(e);
            }
 -        }
 -      }
 -    } catch (InterruptedException e) {
 -      throw new IllegalStateException(e);
 -    } finally {
 -      executor.shutdown();
 -    }
 -  }
 -
 -  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys) throws 
AccumuloException,
 -      AccumuloSecurityException, TableNotFoundException, 
AccumuloServerException {
 +          // wait for the fate operation to complete in a separate thread pool
 +        }, startExecutor).thenApplyAsync(pair -> {
 +          final TFateId opid = pair.getFirst();
 +          final List<Text> completedSplits = pair.getSecond();
  
 -    TabletLocator tabLocator = TabletLocator.getLocator(context, env.tableId);
 -    for (Text split : partitionKeys) {
 -      boolean successful = false;
 -      int attempt = 0;
 -      long locationFailures = 0;
 +          try {
 +            String status = handleFateOperation(() -> 
waitForFateOperation(opid), tableName);
  
 -      while (!successful) {
 +            if (SPLIT_SUCCESS_MSG.equals(status)) {
 +              completedSplits.forEach(splitsTodo::remove);
 +            }
 +          } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException
 +              | AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
 +            // This exception type is used because it makes it easier in the 
foreground thread to do
 +            // exception analysis when using CompletableFuture.
 +            throw new CompletionException(e);
 +          } finally {
 +            // always finish table op, even when exception
 +            if (opid != null) {
 +              try {
 +                finishFateOperation(opid);
 +              } catch (Exception e) {
 +                log.warn("Exception thrown while finishing fate table 
operation", e);
 +              }
 +            }
 +          }
 +          return null;
 +        }, waitExecutor);
 +        futures.add(future);
 +      }
  
 -        if (attempt > 0) {
 -          sleepUninterruptibly(100, MILLISECONDS);
 +      try {
 +        futures.forEach(CompletableFuture::join);
 +      } catch (CompletionException ee) {
 +        Throwable excep = ee.getCause();
 +        // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows
 +        // what code path got them here. If the wrapping was not done, the 
user would only
 +        // have the stack trace for the background thread.
 +        if (excep instanceof TableNotFoundException) {
 +          TableNotFoundException tnfe = (TableNotFoundException) excep;
 +          throw new TableNotFoundException(tableId.canonical(), tableName,
 +              "Table not found by background thread", tnfe);
 +        } else if (excep instanceof TableOfflineException) {
 +          log.debug("TableOfflineException occurred in background thread. 
Throwing new exception",
 +              excep);
 +          throw new TableOfflineException(tableId, tableName);
 +        } else if (excep instanceof AccumuloSecurityException) {
 +          // base == background accumulo security exception
 +          AccumuloSecurityException base = (AccumuloSecurityException) excep;
 +          throw new AccumuloSecurityException(base.getUser(), 
base.asThriftException().getCode(),
 +              base.getTableInfo(), excep);
 +        } else if (excep instanceof AccumuloServerException) {
 +          throw new AccumuloServerException((AccumuloServerException) excep);
 +        } else {
 +          throw new AccumuloException(excep);
          }
 +      }
  
 -        attempt++;
 +    }
 +    startExecutor.shutdown();
 +    waitExecutor.shutdown();
 +  }
  
 -        TabletLocation tl = tabLocator.locateTablet(context, split, false, 
false);
 +  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, 
TableId tableId,
 +      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 +    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
  
 -        if (tl == null) {
 -          context.requireTableExists(env.tableId, env.tableName);
 -          context.requireNotOffline(env.tableId, env.tableName);
 -          continue;
 -        }
 +    var iterator = splitsTodo.iterator();
 +    while (iterator.hasNext()) {
 +      var split = iterator.next();
  
 -        HostAndPort address = HostAndPort.fromString(tl.getTserverLocation());
 +      try {
 +        Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
 +            
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
 +            .logInterval(Duration.ofMinutes(3)).createRetry();
  
 -        try {
 -          TabletManagementClientService.Client client =
 -              ThriftUtil.getClient(ThriftClientTypes.TABLET_MGMT, address, 
context);
 +        var tablet = tabLocator.findTablet(context, split, false, 
LocationNeed.NOT_REQUIRED);
 +        while (tablet == null) {
 +          context.requireTableExists(tableId, tableName);
            try {
 -
 -            OpTimer timer = null;
 -
 -            if (log.isTraceEnabled()) {
 -              log.trace("tid={} Splitting tablet {} on {} at {}", 
Thread.currentThread().getId(),
 -                  tl.getExtent(), address, split);
 -              timer = new OpTimer().start();
 -            }
 -
 -            client.splitTablet(TraceUtil.traceInfo(), context.rpcCreds(), 
tl.getExtent().toThrift(),
 -                TextUtil.getByteBuffer(split));
 -
 -            // just split it, might as well invalidate it in the cache
 -            tabLocator.invalidateCache(tl.getExtent());
 -
 -            if (timer != null) {
 -              timer.stop();
 -              log.trace("Split tablet in {}", String.format("%.3f secs", 
timer.scale(SECONDS)));
 -            }
 -
 -          } finally {
 -            ThriftUtil.returnClient(client, context);
 -          }
 -
 -        } catch (TApplicationException tae) {
 -          throw new AccumuloServerException(address.toString(), tae);
 -        } catch (ThriftSecurityException e) {
 -          context.clearTableListCache();
 -          context.requireTableExists(env.tableId, env.tableName);
 -          throw new AccumuloSecurityException(e.user, e.code, e);
 -        } catch (NotServingTabletException e) {
 -          // Do not silently spin when we repeatedly fail to get the location 
for a tablet
 -          locationFailures++;
 -          if (locationFailures == 5 || locationFailures % 50 == 0) {
 -            log.warn("Having difficulty locating hosting tabletserver for 
split {} on table {}."
 -                + " Seen {} failures.", split, env.tableName, 
locationFailures);
 +            retry.waitForNextAttempt(log, "Find tablet in " + tableId + " 
containing " + split);
 +          } catch (InterruptedException e) {
 +            throw new RuntimeException(e);
            }
 +          tablet = tabLocator.findTablet(context, split, false, 
LocationNeed.NOT_REQUIRED);
 +        }
  
 -          tabLocator.invalidateCache(tl.getExtent());
 -          continue;
 -        } catch (TException e) {
 -          tabLocator.invalidateCache(context, tl.getTserverLocation());
 +        if (split.equals(tablet.getExtent().endRow())) {
 +          // split already exists, so remove it
 +          iterator.remove();
            continue;
          }
  
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index b52ed9f867,c993cf4cb1..5aa91afaa1
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -221,11 -224,10 +224,11 @@@ public class ExternalCompactionUtil 
    public static List<RunningCompaction> 
getCompactionsRunningOnCompactors(ClientContext context) {
      final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
      final ExecutorService executor = ThreadPools.getServerThreadPools()
-         
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
+         
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
 -    getCompactorAddrs(context).forEach((q, hp) -> {
 +
 +    getCompactorAddrs(context).forEach((group, hp) -> {
        hp.forEach(hostAndPort -> {
 -        rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
 +        rcFutures.add(new RunningCompactionFuture(group, hostAndPort,
              executor.submit(() -> getRunningCompaction(hostAndPort, 
context))));
        });
      });
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index 0000000000,a27ae864ea..c000defb00
mode 000000,100644..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@@ -1,0 -1,70 +1,71 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.util.threads;
+ 
+ public enum ThreadPoolNames {
+ 
+   ACCUMULO_POOL_PREFIX("accumulo.pool"),
+   BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"),
+   BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"),
+   BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"),
+   BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"),
+   
BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"),
+   BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"),
+   
COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"),
+   
COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"),
+   
COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"),
+   CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"),
+   
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
+   
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
+   
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
+   GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+   GENERAL_SERVER_POOL("accumulo.pool.general.server"),
+   IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
+   
INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"),
+   MANAGER_FATE_POOL("accumulo.pool.manager.fate"),
+   MANAGER_STATUS_POOL("accumulo.pool.manager.status"),
+   
MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"),
+   
METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"),
+   METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"),
+   METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"),
+   
SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"),
+   SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"),
+   SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"),
 -  SPLIT_POOL("accumulo.pool.table.ops.add.splits"),
++  SPLIT_START_POOL("accumulo.pool.table.ops.add.splits.start"),
++  SPLIT_WAIT_POOL("accumulo.pool.table.ops.add.splits.wait"),
+   TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"),
+   TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"),
+   TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
+   TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
+   
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
+   TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
+   TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
+   TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"),
+   TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
+   TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
+   
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
+   UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
+   
UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");
+ 
+   public final String poolName;
+ 
+   ThreadPoolNames(String poolName) {
+     this.poolName = poolName;
+   }
+ }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index 3b5b2fbbee,765065d2b8..cad6ff2b09
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@@ -105,7 -117,7 +107,7 @@@ class BulkImportMove extends ManagerRep
        oldToNewMap.put(originalPath, newPath);
      }
      try {
-       fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId);
 -      fs.bulkRename(oldToNewMap, workerCount, 
BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid);
++      fs.bulkRename(oldToNewMap, workerCount, 
BULK_IMPORT_DIR_MOVE_POOL.poolName, fateId);
      } catch (IOException ioe) {
        throw new 
AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
            TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
index d6a328405a,1bc30fc73c..12adab912a
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
@@@ -110,7 -114,7 +112,7 @@@ class MoveExportedFiles extends Manager
        }
      }
      try {
-       fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId);
 -      fs.bulkRename(oldToNewPaths, workerCount, 
IMPORT_TABLE_RENAME_POOL.poolName, fmtTid);
++      fs.bulkRename(oldToNewPaths, workerCount, 
IMPORT_TABLE_RENAME_POOL.poolName, fateId);
      } catch (IOException ioe) {
        throw new 
AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null,
            TableOperation.IMPORT, TableOperationExceptionType.OTHER, 
ioe.getCause().getMessage());
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 2f4f7dd2ff,a463707fec..14d5fa0dfe
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -19,7 -19,7 +19,8 @@@
  package org.apache.accumulo.manager.upgrade;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
  import static 
org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
  import static 
org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2edb357988,f684b72291..c8516d103d
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@@ -253,12 -257,10 +254,12 @@@ public class ScanServer extends Abstrac
            "Tablet metadata cache refresh percentage is '%s' but must be less 
than 1",
            cacheRefreshPercentage);
  
-       tmCacheExecutor = 
context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8)
-           .enableThreadPoolMetrics().build();
+       tmCacheExecutor = 
context.threadPools().getPoolBuilder(SCAN_SERVER_TABLET_METADATA_CACHE_POOL)
+           .numCoreThreads(8).enableThreadPoolMetrics().build();
 -      var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, 
TimeUnit.MILLISECONDS)
 -          
.scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats();
 +      var builder =
 +          
context.getCaches().createNewBuilder(CacheName.SCAN_SERVER_TABLET_METADATA, 
true)
 +              .expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
 +              
.scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats();
        if (cacheRefreshPercentage > 0) {
          // Compute the refresh time as a percentage of the expiration time
          // Cache hits after this time, but before expiration, will trigger a 
background
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index cfa95cd269,46a96e6b5c..1ee36fac51
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -23,6 -23,17 +23,15 @@@ import static java.util.Objects.require
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.SECONDS;
  import static java.util.stream.Collectors.toUnmodifiableMap;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL;
  
  import java.io.IOException;
  import java.util.ArrayList;
@@@ -301,14 -316,21 +312,14 @@@ public class TabletServerResourceManage
      }
  
      minorCompactionThreadPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
 -        Property.TSERV_MINC_MAXCONCURRENT, true);
 +        Property.TSERV_MINC_MAXCONCURRENT, enableMetrics);
      modifyThreadPoolSizesAtRuntime(
          () -> 
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
-         "minor compactor", minorCompactionThreadPool);
+         TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool);
  
 -    splitThreadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder(SPLIT_POOL)
 -        .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS).build();
 -
 -    defaultSplitThreadPool =
 -        
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_DEFAULT_SPLIT_POOL)
 -            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, 
SECONDS).build();
 -
      defaultMigrationPool =
-         ThreadPools.getServerThreadPools().getPoolBuilder("metadata tablet 
migration")
+         
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL)
 -            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, 
SECONDS).build();
 +            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, 
SECONDS).build();
  
      migrationPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
          Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 030720c93d,ec86d82a25..1fdae2890e
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -291,31 -291,13 +292,31 @@@ public class LogSorter 
      }
    }
  
 -  public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
 -    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
 +  /**
 +   * Sort any logs that need sorting in the current thread.
 +   *
 +   * @return The time in millis when the next check can be done.
 +   */
 +  public long sortLogsIfNeeded() throws KeeperException, InterruptedException 
{
 +    DistributedWorkQueue dwq = new DistributedWorkQueue(
 +        context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, 
context);
 +    dwq.processExistingWork(new LogProcessor(), 
MoreExecutors.newDirectExecutorService(), 1, false);
 +    return System.currentTimeMillis() + dwq.getCheckInterval();
 +  }
 +
 +  /**
 +   * Sort any logs that need sorting in a ThreadPool using
 +   * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will 
start a background
 +   * thread to look for log sorting work in the future that will be processed 
by the
 +   * ThreadPoolExecutor
 +   */
 +  public void startWatchingForRecoveryLogs(int threadPoolSize)
 +      throws KeeperException, InterruptedException {
      ThreadPoolExecutor threadPool =
-         
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+         
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL)
              .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
 -        context).startProcessing(new LogProcessor(), threadPool);
 +        context).processExistingAndFuture(new LogProcessor(), threadPool);
    }
  
    public List<RecoveryStatus> getLogSorts() {


Reply via email to