This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9ec22b4b03e979ca5c238a3dd123c8a7fc1dd3db
Merge: 11b1f48f03 b90ca95323
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Sun Jul 28 23:08:53 2024 +0000

    Merge branch '2.1'

 assemble/bin/accumulo-cluster                      |  2 +-
 .../accumulo/core/clientImpl/ClientContext.java    | 10 ++--
 .../core/clientImpl/ConditionalWriterImpl.java     |  3 +-
 .../core/clientImpl/InstanceOperationsImpl.java    |  3 +-
 .../core/clientImpl/TableOperationsImpl.java       |  3 +-
 .../core/clientImpl/TabletServerBatchReader.java   |  8 ++-
 .../core/clientImpl/TabletServerBatchWriter.java   |  8 ++-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  | 12 ++--
 .../accumulo/core/file/BloomFilterLayer.java       |  3 +-
 .../util/compaction/ExternalCompactionUtil.java    |  7 ++-
 .../core/util/threads/ThreadPoolNames.java         | 70 ++++++++++++++++++++++
 .../accumulo/core/util/threads/ThreadPools.java    | 60 ++++++++++++++-----
 .../core/file/rfile/MultiThreadedRFileTest.java    |  2 +-
 .../threads/ThreadPoolExecutorBuilderTest.java     | 14 ++---
 .../conf/store/impl/PropCacheCaffeineImpl.java     |  4 +-
 .../server/conf/store/impl/PropStoreWatcher.java   |  2 +-
 .../accumulo/server/problems/ProblemReports.java   |  2 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   | 12 ++--
 .../server/util/RemoveEntriesForMissingFiles.java  |  6 +-
 .../server/util/VerifyTabletAssignments.java       |  6 +-
 .../server/conf/store/impl/ReadyMonitorTest.java   |  2 +-
 .../accumulo/coordinator/CompactionFinalizer.java  |  6 +-
 .../manager/tableOps/bulkVer2/BulkImportMove.java  |  4 +-
 .../tableOps/tableImport/MoveExportedFiles.java    |  4 +-
 .../manager/upgrade/UpgradeCoordinator.java        |  6 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |  5 +-
 .../tserver/TabletServerResourceManager.java       | 60 +++++++++++--------
 .../tserver/compactions/CompactionService.java     |  6 +-
 .../compactions/InternalCompactionExecutor.java    |  8 ++-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  3 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |  3 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |  4 +-
 .../test/functional/BatchWriterFlushIT.java        |  2 +-
 33 files changed, 252 insertions(+), 98 deletions(-)

diff --cc assemble/bin/accumulo-cluster
index 5c4c50af52,392927c4ac..aee3ad2b4b
--- a/assemble/bin/accumulo-cluster
+++ b/assemble/bin/accumulo-cluster
@@@ -244,10 -252,10 +244,10 @@@ function stop_sservers() 
      G="SSERVER_HOSTS_${group}"
      if [[ -n ${!G} ]]; then
        for sserver in ${!G}; do
 -        stop_service "$sserver" sserver "-g" "$group"
 +        stop_service "$sserver" sserver "-o" "sserver.group=$group"
        done
      else
-       echo "${queue} is not a valid resource group ...exiting"
+       echo "${group} is not a valid resource group ...exiting"
      fi
    done
  }
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 654135a852,ceaf6901a5..00ebbeb363
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@@ -22,6 -21,8 +22,7 @@@ import static com.google.common.util.co
  import static java.nio.charset.StandardCharsets.UTF_8;
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_POOL;
  
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
@@@ -375,9 -378,9 +376,9 @@@ public class ConditionalWriterImpl impl
      this.config = config;
      this.context = context;
      this.auths = config.getAuthorizations();
 -    this.ve = new VisibilityEvaluator(config.getAuthorizations());
 +    this.accessEvaluator = 
AccessEvaluator.of(config.getAuthorizations().toAccessAuthorizations());
      this.threadPool = context.threadPools().createScheduledExecutorService(
-         config.getMaxWriteThreads(), this.getClass().getSimpleName());
+         config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName);
      this.locator = new SyncingTabletLocator(context, tableId);
      this.serverQueues = new HashMap<>();
      this.tableId = tableId;
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 215f7c6214,8c73dab8e5..eeb921d7ca
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@@ -25,8 -28,8 +25,9 @@@ import static org.apache.accumulo.core.
  import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport;
  import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
  import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL;
  
 +import java.time.Duration;
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.ConcurrentModificationException;
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 6b94fd81cf,c3b51237b6..91a1248c97
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -27,9 -27,10 +27,10 @@@ import static java.util.concurrent.Time
  import static java.util.stream.Collectors.toSet;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
  import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
  
  import java.io.BufferedReader;
  import java.io.FileNotFoundException;
diff --cc core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index fd8d4cb6dd,0801371d48..2cdd3f213e
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@@ -19,7 -19,7 +19,8 @@@
  package org.apache.accumulo.core.file;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.BLOOM_LOADER_POOL;
  
  import java.io.DataInputStream;
  import java.io.DataOutputStream;
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index bbcc3f1529,b3e4a99a7c..c993cf4cb1
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -18,6 -18,10 +18,9 @@@
   */
  package org.apache.accumulo.core.util.compaction;
  
 -import static java.nio.charset.StandardCharsets.UTF_8;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL;
+ 
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index 0000000000,bdebd03b2d..a27ae864ea
mode 000000,100644..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@@ -1,0 -1,78 +1,70 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.util.threads;
+ 
+ public enum ThreadPoolNames {
+ 
+   ACCUMULO_POOL_PREFIX("accumulo.pool"),
+   BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"),
+   BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"),
+   BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"),
+   BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"),
+   
BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"),
+   BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"),
 -  
COMPACTION_COORDINATOR_SUMMARY_POOL("accumulo.pool.compaction.summary.gatherer"),
+   
COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"),
+   
COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"),
+   
COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"),
+   CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"),
+   
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
+   
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
+   
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
+   GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+   GENERAL_SERVER_POOL("accumulo.pool.general.server"),
 -  GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
+   IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
+   
INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"),
 -  MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"),
+   MANAGER_FATE_POOL("accumulo.pool.manager.fate"),
 -  MANAGER_RENAME_POOL("accumulo.pool.manager.rename"),
+   MANAGER_STATUS_POOL("accumulo.pool.manager.status"),
+   
MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"),
+   
METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"),
+   METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"),
+   METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"),
 -  REPLICATION_WORKER_POOL("accumulo.pool.replication.worker"),
 -  SCAN_POOL("accumulo.pool.scan"),
+   
SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"),
+   SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"),
+   SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"),
+   SPLIT_POOL("accumulo.pool.table.ops.add.splits"),
+   TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"),
+   TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"),
 -  TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"),
+   TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
+   TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
+   
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
+   TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
+   TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
+   TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"),
+   TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
+   TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
+   
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
 -  TSERVER_WORKQ_POOL("accumulo.pool.tserver.workq"),
+   UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
+   
UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");
+ 
+   public final String poolName;
+ 
+   ThreadPoolNames(String poolName) {
+     this.poolName = poolName;
+   }
+ }
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 6593237ab1,b2b0bc02db..3914c8da90
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@@ -21,6 -21,23 +21,18 @@@ package org.apache.accumulo.core.util.t
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.SECONDS;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_RETRIEVAL_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL;
  
  import java.lang.Thread.UncaughtExceptionHandler;
  import java.util.Iterator;
@@@ -267,11 -288,28 +279,11 @@@ public class ThreadPools 
        final Property p, boolean emitThreadPoolMetrics) {
      ThreadPoolExecutorBuilder builder;
      switch (p) {
 -      case GENERAL_SIMPLETIMER_THREADPOOL_SIZE:
 -        return createScheduledExecutorService(conf.getCount(p),
 -            GENERAL_SERVER_SIMPLETIMER_POOL.poolName);
        case GENERAL_THREADPOOL_SIZE:
-         return createScheduledExecutorService(conf.getCount(p), 
"GeneralExecutor",
+         return createScheduledExecutorService(conf.getCount(p), 
GENERAL_SERVER_POOL.poolName,
              emitThreadPoolMetrics);
 -      case MANAGER_BULK_THREADPOOL_SIZE:
 -        builder =
 -            
getPoolBuilder(MANAGER_BULK_IMPORT_POOL).numCoreThreads(conf.getCount(p)).withTimeOut(
 -                
conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS);
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
 -      case MANAGER_RENAME_THREADS:
 -        builder = 
getPoolBuilder(MANAGER_RENAME_POOL).numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
        case MANAGER_FATE_THREADPOOL_SIZE:
-         builder = getPoolBuilder("Repo 
Runner").numCoreThreads(conf.getCount(p));
+         builder = 
getPoolBuilder(MANAGER_FATE_POOL).numCoreThreads(conf.getCount(p));
          if (emitThreadPoolMetrics) {
            builder.enableThreadPoolMetrics();
          }
@@@ -289,9 -327,15 +301,9 @@@
            builder.enableThreadPoolMetrics();
          }
          return builder.build();
 -      case TSERV_WORKQ_THREADS:
 -        builder = 
getPoolBuilder(TSERVER_WORKQ_POOL).numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
        case TSERV_MINC_MAXCONCURRENT:
-         builder = getPoolBuilder("minor 
compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L,
-             MILLISECONDS);
+         builder = 
getPoolBuilder(TSERVER_MINOR_COMPACTOR_POOL).numCoreThreads(conf.getCount(p))
+             .withTimeOut(0L, MILLISECONDS);
          if (emitThreadPoolMetrics) {
            builder.enableThreadPoolMetrics();
          }
@@@ -332,7 -376,14 +344,7 @@@
          }
          return builder.build();
        case GC_DELETE_THREADS:
-         return 
getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build();
+         return 
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
 -      case REPLICATION_WORKER_THREADS:
 -        builder = 
getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
 -
        default:
          throw new IllegalArgumentException("Unhandled thread pool property: " 
+ p);
      }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 84e69d561f,5445d6d3da..a463707fec
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -19,8 -19,7 +19,9 @@@
  package org.apache.accumulo.manager.upgrade;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
  
  import java.io.IOException;
  import java.util.Collections;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 9840df50cf,18cb23b37b..f684b72291
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@@ -18,7 -18,9 +18,8 @@@
   */
  package org.apache.accumulo.tserver;
  
 -import static java.nio.charset.StandardCharsets.UTF_8;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL;
  
  import java.io.IOException;
  import java.io.UncheckedIOException;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 2bb80309a1,655b54fdbd..46a96e6b5c
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -23,6 -22,18 +23,17 @@@ import static java.util.Objects.require
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.SECONDS;
  import static java.util.stream.Collectors.toUnmodifiableMap;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL;
  
  import java.io.IOException;
  import java.util.ArrayList;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 2b5a6c6135,32a249e7b6..ec86d82a25
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -291,9 -294,11 +292,9 @@@ public class LogSorter 
    }
  
    public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
 -    @SuppressWarnings("deprecation")
 -    int threadPoolSize = this.conf.getCount(this.conf
 -        .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, 
Property.TSERV_RECOVERY_MAX_CONCURRENT));
 +    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
      ThreadPoolExecutor threadPool =
-         
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+         
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL)
              .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
          context).startProcessing(new LogProcessor(), threadPool);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 54dad2b243,6757d276ee..544f53acaa
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -257,22 -263,60 +258,22 @@@ public class TabletServerLogger 
      if (nextLogMaker != null) {
        return;
      }
-     nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog 
creator")
+     nextLogMaker = 
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_CREATOR_POOL)
 -        .numCoreThreads(1).build();
 -    nextLogMaker.execute(new Runnable() {
 -      @Override
 -      public void run() {
 -        final ServerResources conf = tserver.getServerConfig();
 -        final VolumeManager fs = conf.getVolumeManager();
 -        while (!nextLogMaker.isShutdown()) {
 -          log.debug("Creating next WAL");
 -          DfsLogger alog = null;
 -
 -          try {
 -            alog = new DfsLogger(tserver.getContext(), conf, syncCounter, 
flushCounter);
 -            alog.open(tserver.getClientAddressString());
 -          } catch (Exception t) {
 -            log.error("Failed to open WAL", t);
 -            // the log is not advertised in ZK yet, so we can just delete it 
if it exists
 -            if (alog != null) {
 -              try {
 -                alog.close();
 -              } catch (Exception e) {
 -                log.error("Failed to close WAL after it failed to open", e);
 -              }
 -
 -              try {
 -                Path path = alog.getPath();
 -                if (fs.exists(path)) {
 -                  fs.delete(path);
 -                }
 -              } catch (Exception e) {
 -                log.warn("Failed to delete a WAL that failed to open", e);
 -              }
 -            }
 -
 -            try {
 -              nextLog.offer(t, 12, TimeUnit.HOURS);
 -            } catch (InterruptedException ex) {
 -              // ignore
 -            }
 -
 -            continue;
 -          }
 -
 -          String fileName = alog.getFileName();
 -          log.debug("Created next WAL {}", fileName);
 -
 -          try {
 -            tserver.addNewLogMarker(alog);
 -          } catch (Exception t) {
 -            log.error("Failed to add new WAL marker for " + fileName, t);
 +        .numCoreThreads(1).enableThreadPoolMetrics().build();
 +    nextLogMaker.execute(() -> {
 +      final VolumeManager fs = tserver.getVolumeManager();
 +      while (!nextLogMaker.isShutdown()) {
 +        log.debug("Creating next WAL");
 +        DfsLogger alog = null;
  
 +        try {
 +          alog = DfsLogger.createNew(tserver.getContext(), syncCounter, 
flushCounter,
 +              tserver.getClientAddressString());
 +        } catch (Exception t) {
 +          log.error("Failed to open WAL", t);
 +          // the log is not advertised in ZK yet, so we can just delete it if 
it exists
 +          if (alog != null) {
              try {
 -              // Intentionally not deleting walog because it may have been 
advertised in ZK. See
 -              // #949
                alog.close();
              } catch (Exception e) {
                log.error("Failed to close WAL after it failed to open", e);

Reply via email to