This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0ad96b1dc088070486ccc92a170545a6c91fab7f
Merge: 8b0262d5b3 ec8ae122ed
Author: Ed Coleman <edcole...@apache.org>
AuthorDate: Tue Mar 19 19:02:30 2024 +0000

    Merge remote-tracking branch 'upstream/2.1'

 .../accumulo/core/clientImpl/ClientContext.java    |   9 +-
 .../core/clientImpl/ConditionalWriterImpl.java     |   2 +-
 .../core/clientImpl/InstanceOperationsImpl.java    |   4 +-
 .../core/clientImpl/TableOperationsImpl.java       |   3 +-
 .../core/clientImpl/TabletServerBatchReader.java   |   5 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |   8 +-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |   8 +-
 .../accumulo/core/file/BloomFilterLayer.java       |   5 +-
 .../file/blockfile/cache/lru/LruBlockCache.java    |   4 +-
 .../blockfile/cache/tinylfu/TinyLfuBlockCache.java |   2 +-
 .../util/compaction/ExternalCompactionUtil.java    |  10 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 386 +++++++++++++--------
 .../core/file/rfile/MultiThreadedRFileTest.java    |   7 +-
 .../threads/ThreadPoolExecutorBuilderTest.java     |  79 +++++
 .../server/conf/ServerConfigurationFactory.java    |   4 +-
 .../conf/store/impl/PropCacheCaffeineImpl.java     |   7 +-
 .../server/conf/store/impl/PropStoreWatcher.java   |   5 +-
 .../accumulo/server/fs/VolumeManagerImpl.java      |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   7 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   |   7 +-
 .../server/util/RemoveEntriesForMissingFiles.java  |   4 +-
 .../server/util/VerifyTabletAssignments.java       |   5 +-
 .../server/conf/store/impl/ReadyMonitorTest.java   |   4 +-
 .../coordinator/CompactionCoordinator.java         |   4 +-
 .../accumulo/coordinator/CompactionFinalizer.java  |  11 +-
 .../main/java/org/apache/accumulo/gc/GCRun.java    |   2 +-
 .../accumulo/manager/metrics/fate/FateMetrics.java |   4 +-
 .../accumulo/manager/recovery/RecoveryManager.java |   4 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   8 +-
 .../tserver/TabletServerResourceManager.java       |  31 +-
 .../tserver/compactions/CompactionService.java     |   6 +-
 .../compactions/InternalCompactionExecutor.java    |   9 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |   5 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   4 +-
 .../metrics/CompactionExecutorsMetrics.java        |   2 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |   5 +-
 .../test/functional/BatchWriterFlushIT.java        |   2 +-
 .../accumulo/test/functional/BulkFailureIT.java    |   4 +-
 38 files changed, 433 insertions(+), 247 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 12adfc5c98,147f19e5dd..334fb46a53
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@@ -243,46 -264,105 +261,82 @@@ public class ThreadPools 
     * @return ExecutorService impl
     * @throws IllegalArgumentException if property is not handled
     */
 -  @SuppressWarnings("deprecation")
    public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration 
conf,
        final Property p, boolean emitThreadPoolMetrics) {
- 
+     ThreadPoolExecutorBuilder builder;
      switch (p) {
 -      case GENERAL_SIMPLETIMER_THREADPOOL_SIZE:
 -        return createScheduledExecutorService(conf.getCount(p), 
"SimpleTimer");
        case GENERAL_THREADPOOL_SIZE:
          return createScheduledExecutorService(conf.getCount(p), 
"GeneralExecutor",
              emitThreadPoolMetrics);
 -      case MANAGER_BULK_THREADPOOL_SIZE:
 -        builder = getPoolBuilder("bulk 
import").numCoreThreads(conf.getCount(p)).withTimeOut(
 -            conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), 
MILLISECONDS);
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
 -      case MANAGER_RENAME_THREADS:
 -        builder = getPoolBuilder("bulk 
move").numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
        case MANAGER_FATE_THREADPOOL_SIZE:
-         return createFixedThreadPool(conf.getCount(p), "Repo Runner", 
emitThreadPoolMetrics);
+         builder = getPoolBuilder("Repo 
Runner").numCoreThreads(conf.getCount(p));
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case MANAGER_STATUS_THREAD_POOL_SIZE:
+         builder = getPoolBuilder("GatherTableInformation");
          int threads = conf.getCount(p);
          if (threads == 0) {
-           return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, 
"GatherTableInformation",
-               new SynchronousQueue<>(), emitThreadPoolMetrics);
+           
builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, 
SECONDS)
+               .withQueue(new SynchronousQueue<>());
          } else {
-           return createFixedThreadPool(threads, "GatherTableInformation", 
emitThreadPoolMetrics);
+           builder.numCoreThreads(threads);
          }
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_WORKQ_THREADS:
-         return createFixedThreadPool(conf.getCount(p), "distributed work 
queue",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("distributed work 
queue").numCoreThreads(conf.getCount(p));
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_MINC_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"minor compactor",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("minor 
compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L,
+             MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_MIGRATE_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet migration",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("tablet 
migration").numCoreThreads(conf.getCount(p))
+             .withTimeOut(0L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_ASSIGNMENT_MAXCONCURRENT:
-         return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, 
"tablet assignment",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("tablet 
assignment").numCoreThreads(conf.getCount(p))
+             .withTimeOut(0L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_RETRIEVAL_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-             "summary file retriever", emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary file 
retriever").numCoreThreads(conf.getCount(p))
+             .withTimeOut(60L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_REMOTE_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS, "summary remote",
-             emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary 
remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L,
+             MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case TSERV_SUMMARY_PARTITION_THREADS:
-         return createThreadPool(conf.getCount(p), conf.getCount(p), 60, 
SECONDS,
-             "summary partition", emitThreadPoolMetrics);
+         builder = getPoolBuilder("summary 
partition").numCoreThreads(conf.getCount(p))
+             .withTimeOut(60L, MILLISECONDS);
+         if (emitThreadPoolMetrics) {
+           builder.enableThreadPoolMetrics();
+         }
+         return builder.build();
        case GC_DELETE_THREADS:
-         return createFixedThreadPool(conf.getCount(p), "deleting", 
emitThreadPoolMetrics);
+         return 
getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build();
 -      case REPLICATION_WORKER_THREADS:
 -        builder = getPoolBuilder("replication 
task").numCoreThreads(conf.getCount(p));
 -        if (emitThreadPoolMetrics) {
 -          builder.enableThreadPoolMetrics();
 -        }
 -        return builder.build();
 -
        default:
          throw new IllegalArgumentException("Unhandled thread pool property: " 
+ p);
      }
@@@ -460,10 -550,25 +524,23 @@@
     */
    public ScheduledThreadPoolExecutor
        createGeneralScheduledExecutorService(AccumuloConfiguration conf) {
 -    @SuppressWarnings("deprecation")
 -    Property oldProp = Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE;
 -    Property prop = conf.resolve(Property.GENERAL_THREADPOOL_SIZE, oldProp);
 -    return (ScheduledThreadPoolExecutor) createExecutorService(conf, prop, 
true);
 +    return (ScheduledThreadPoolExecutor) createExecutorService(conf,
 +        Property.GENERAL_THREADPOOL_SIZE, true);
    }
  
+   /**
+    * Create a named ScheduledThreadPool. The pool will not be instrumented 
without additional
+    * metrics. This method should be preferred, especially for short-lived 
pools.
+    *
+    * @param numThreads number of threads
+    * @param name thread pool name
+    * @return ScheduledThreadPoolExecutor
+    */
+   public ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
+       final String name) {
+     return createScheduledExecutorService(numThreads, name, false);
+   }
+ 
    /**
     * Create a named ScheduledThreadPool
     *
diff --cc 
core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 8c66bb6ead,ea0c4ceabe..2f1525dc11
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@@ -19,8 -19,7 +19,8 @@@
  package org.apache.accumulo.core.file.rfile;
  
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.SECONDS;
+ import static java.util.concurrent.TimeUnit.MINUTES;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
  import static org.junit.jupiter.api.Assertions.assertTrue;
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index 032165a0ce,2c04fc9e49..2708d21648
--- 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@@ -18,8 -18,8 +18,10 @@@
   */
  package org.apache.accumulo.server.problems;
  
+ import static java.util.concurrent.TimeUnit.SECONDS;
+ 
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
  import java.util.Collections;
  import java.util.EnumMap;
  import java.util.Iterator;
diff --cc 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 685080c5b1,75690df777..7358ce4416
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@@ -144,8 -141,6 +144,8 @@@ public class CompactionCoordinator exte
      super("compaction-coordinator", opts, args);
      aconf = conf == null ? super.getConfiguration() : conf;
      schedExecutor = 
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-     summariesExecutor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
-         "Compaction Summary Gatherer", false);
++    summariesExecutor = ThreadPools.getServerThreadPools()
++        .getPoolBuilder("Compaction Summary 
Gatherer").numCoreThreads(10).build();
      compactionFinalizer = createCompactionFinalizer(schedExecutor);
      tserverSet = createLiveTServerSet();
      setupSecurity();
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 1939d6e51a,cd1753b67f..4cb6f3b94c
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@@ -283,9 -292,9 +283,9 @@@ public class GCRun implements GarbageCo
      minimizeDeletes(confirmedDeletes, processedDeletes, fs, log);
  
      ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
-         .createExecutorService(config, Property.GC_DELETE_THREADS, false);
+         .createExecutorService(config, Property.GC_DELETE_THREADS);
  
 -    final List<Pair<Path,Path>> replacements = 
context.getVolumeReplacements();
 +    final Map<Path,Path> replacements = context.getVolumeReplacements();
  
      for (final GcCandidate delete : confirmedDeletes.values()) {
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index b33a3cfe7a,f405881cb6..cb590a3642
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@@ -70,11 -70,11 +70,11 @@@ public class RecoveryManager 
    public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) {
      this.manager = manager;
      existenceCache =
 -        CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, 
TimeUnit.MILLISECONDS)
 +        Caffeine.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, 
TimeUnit.MILLISECONDS)
              .maximumWeight(10_000_000).weigher((path, exist) -> 
path.toString().length()).build();
  
-     executor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(4,
-         "Walog sort starter", false);
+     executor =
+         ThreadPools.getServerThreadPools().createScheduledExecutorService(4, 
"Walog sort starter");
      zooCache = new ZooCache(manager.getContext().getZooReader(), null);
      try {
        List<String> workIDs =
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 34b362fa2e,11cb713b58..84e69d561f
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -18,8 -18,7 +18,9 @@@
   */
  package org.apache.accumulo.manager.upgrade;
  
+ import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
 +import static 
org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
  
  import java.io.IOException;
  import java.util.Collections;
@@@ -198,16 -194,12 +198,16 @@@ public class UpgradeCoordinator 
          "Not currently in a suitable state to do metadata upgrade %s", 
status);
  
      if (currentVersion < AccumuloDataVersion.get()) {
-       return ThreadPools.getServerThreadPools().createThreadPool(0, 
Integer.MAX_VALUE, 60L,
-           TimeUnit.SECONDS, "UpgradeMetadataThreads", new 
SynchronousQueue<>(), false)
-           .submit(() -> {
+       return 
ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads")
+           
.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS)
+           .withQueue(new SynchronousQueue<>()).build().submit(() -> {
              try {
                for (int v = currentVersion; v < AccumuloDataVersion.get(); 
v++) {
 -                log.info("Upgrading Root from data version {}", v);
 +                log.info("Upgrading Root - current version {} as step towards 
target version {}", v,
 +                    AccumuloDataVersion.get());
 +                var upgrader = upgraders.get(v);
 +                Objects.requireNonNull(upgrader,
 +                    "upgrade root: failed to find root upgrader for version " 
+ currentVersion);
                  upgraders.get(v).upgradeRoot(context);
                }
                setStatus(UpgradeStatus.UPGRADED_ROOT, eventCoordinator);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index b0a225295c,3d80f9c1ed..de47b841fd
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -18,9 -18,11 +18,11 @@@
   */
  package org.apache.accumulo.tserver;
  
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.util.Objects.requireNonNull;
+ import static java.util.concurrent.TimeUnit.MILLISECONDS;
+ import static java.util.concurrent.TimeUnit.SECONDS;
  import static java.util.stream.Collectors.toUnmodifiableMap;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
  
  import java.io.IOException;
  import java.util.ArrayList;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 33ce1989e4,6fc396e4f4..2b5a6c6135
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -291,9 -293,12 +291,10 @@@ public class LogSorter 
    }
  
    public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
 -    @SuppressWarnings("deprecation")
 -    int threadPoolSize = this.conf.getCount(this.conf
 -        .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, 
Property.TSERV_RECOVERY_MAX_CONCURRENT));
 +    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
-     ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
-         .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
+     ThreadPoolExecutor threadPool =
+         
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+             .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
          context).startProcessing(new LogProcessor(), threadPool);
    }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index dc66adc26a,346b70166c..54dad2b243
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -257,22 -262,60 +257,22 @@@ public class TabletServerLogger 
      if (nextLogMaker != null) {
        return;
      }
-     nextLogMaker =
-         ThreadPools.getServerThreadPools().createFixedThreadPool(1, "WALog 
creator", true);
+     nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog 
creator")
+         .numCoreThreads(1).enableThreadPoolMetrics().build();
 -    nextLogMaker.execute(new Runnable() {
 -      @Override
 -      public void run() {
 -        final ServerResources conf = tserver.getServerConfig();
 -        final VolumeManager fs = conf.getVolumeManager();
 -        while (!nextLogMaker.isShutdown()) {
 -          log.debug("Creating next WAL");
 -          DfsLogger alog = null;
 -
 -          try {
 -            alog = new DfsLogger(tserver.getContext(), conf, syncCounter, 
flushCounter);
 -            alog.open(tserver.getClientAddressString());
 -          } catch (Exception t) {
 -            log.error("Failed to open WAL", t);
 -            // the log is not advertised in ZK yet, so we can just delete it 
if it exists
 -            if (alog != null) {
 -              try {
 -                alog.close();
 -              } catch (Exception e) {
 -                log.error("Failed to close WAL after it failed to open", e);
 -              }
 -
 -              try {
 -                Path path = alog.getPath();
 -                if (fs.exists(path)) {
 -                  fs.delete(path);
 -                }
 -              } catch (Exception e) {
 -                log.warn("Failed to delete a WAL that failed to open", e);
 -              }
 -            }
 -
 -            try {
 -              nextLog.offer(t, 12, TimeUnit.HOURS);
 -            } catch (InterruptedException ex) {
 -              // ignore
 -            }
 -
 -            continue;
 -          }
 -
 -          String fileName = alog.getFileName();
 -          log.debug("Created next WAL {}", fileName);
 -
 -          try {
 -            tserver.addNewLogMarker(alog);
 -          } catch (Exception t) {
 -            log.error("Failed to add new WAL marker for " + fileName, t);
 +    nextLogMaker.execute(() -> {
 +      final VolumeManager fs = tserver.getVolumeManager();
 +      while (!nextLogMaker.isShutdown()) {
 +        log.debug("Creating next WAL");
 +        DfsLogger alog = null;
  
 +        try {
 +          alog = DfsLogger.createNew(tserver.getContext(), syncCounter, 
flushCounter,
 +              tserver.getClientAddressString());
 +        } catch (Exception t) {
 +          log.error("Failed to open WAL", t);
 +          // the log is not advertised in ZK yet, so we can just delete it if 
it exists
 +          if (alog != null) {
              try {
 -              // Intentionally not deleting walog because it may have been 
advertised in ZK. See
 -              // #949
                alog.close();
              } catch (Exception e) {
                log.error("Failed to close WAL after it failed to open", e);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index b8cbbc19a1,e3c6bb5150..59c66e1d35
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@@ -72,8 -72,12 +72,8 @@@ public class CompactionExecutorsMetric
    }
  
    public CompactionExecutorsMetrics() {
 -    startUpdateThread();
 -  }
 -
 -  protected void startUpdateThread() {
      ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
-         .createScheduledExecutorService(1, 
"compactionExecutorsMetricsPoller", false);
+         .createScheduledExecutorService(1, 
"compactionExecutorsMetricsPoller");
      Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
      long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
      
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
index e721c34837,98f3e47f29..cb4b62d1d7
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
@@@ -107,100 -96,6 +107,100 @@@ public class BulkFailureIT extends Accu
      runTest(tables[1], 22222222L, BulkFailureIT::newLoad);
    }
  
 +  private static Path createNewBulkDir(ServerContext context, VolumeManager 
fs, String sourceDir,
 +      TableId tableId) throws IOException {
 +    Path tableDir = fs.matchingFileSystem(new Path(sourceDir), 
context.getTablesDirs());
 +    if (tableDir == null) {
 +      throw new IOException(
 +          sourceDir + " is not in the same file system as any volume 
configured for Accumulo");
 +    }
 +
 +    Path directory = new Path(tableDir, tableId.canonical());
 +    fs.mkdirs(directory);
 +
 +    // only one should be able to create the lock file
 +    // the purpose of the lock file is to avoid a race
 +    // condition between the call to fs.exists() and
 +    // fs.mkdirs()... if only hadoop had a mkdir() function
 +    // that failed when the dir existed
 +
 +    UniqueNameAllocator namer = context.getUniqueNameAllocator();
 +
 +    while (true) {
 +      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + 
namer.getNextName());
 +      if (fs.exists(newBulkDir)) { // sanity check
 +        throw new IOException("Dir exist when it should not " + newBulkDir);
 +      }
 +      if (fs.mkdirs(newBulkDir)) {
 +        return newBulkDir;
 +      }
 +
 +      sleepUninterruptibly(3, TimeUnit.SECONDS);
 +    }
 +  }
 +
 +  public static String prepareBulkImport(ServerContext manager, final 
VolumeManager fs, String dir,
 +      TableId tableId, long tid) throws Exception {
 +    final Path bulkDir = createNewBulkDir(manager, fs, dir, tableId);
 +
 +    manager.getAmple().addBulkLoadInProgressFlag(
 +        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
 +
 +    Path dirPath = new Path(dir);
 +    FileStatus[] dataFiles = fs.listStatus(dirPath);
 +
 +    final UniqueNameAllocator namer = manager.getUniqueNameAllocator();
 +
 +    AccumuloConfiguration serverConfig = manager.getConfiguration();
 +    int numThreads = serverConfig.getCount(Property.MANAGER_RENAME_THREADS);
-     ExecutorService workers =
-         ThreadPools.getServerThreadPools().createFixedThreadPool(numThreads, 
"bulk rename", false);
++    ExecutorService workers = 
ThreadPools.getServerThreadPools().getPoolBuilder("bulk rename")
++        .numCoreThreads(numThreads).build();
 +    List<Future<Exception>> results = new ArrayList<>();
 +
 +    for (FileStatus file : dataFiles) {
 +      final FileStatus fileStatus = file;
 +      results.add(workers.submit(() -> {
 +        try {
 +          String[] sa = fileStatus.getPath().getName().split("\\.");
 +          String extension = "";
 +          if (sa.length > 1) {
 +            extension = sa[sa.length - 1];
 +
 +            if (!FileOperations.getValidExtensions().contains(extension)) {
 +              LOG.warn("{} does not have a valid extension, ignoring", 
fileStatus.getPath());
 +              return null;
 +            }
 +          } else {
 +            LOG.warn("{} does not have any extension, ignoring", 
fileStatus.getPath());
 +            return null;
 +          }
 +
 +          String newName = "I" + namer.getNextName() + "." + extension;
 +          Path newPath = new Path(bulkDir, newName);
 +          try {
 +            fs.rename(fileStatus.getPath(), newPath);
 +            LOG.debug("Moved {} to {}", fileStatus.getPath(), newPath);
 +          } catch (IOException E1) {
 +            LOG.error("Could not move: {} {}", fileStatus.getPath(), 
E1.getMessage());
 +          }
 +
 +        } catch (Exception ex) {
 +          return ex;
 +        }
 +        return null;
 +      }));
 +    }
 +    workers.shutdown();
 +    while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {}
 +
 +    for (Future<Exception> ex : results) {
 +      if (ex.get() != null) {
 +        throw ex.get();
 +      }
 +    }
 +    return bulkDir.toString();
 +  }
 +
    /**
     * This test verifies two things. First it ensures that after a bulk 
imported file is compacted
     * that import request are ignored. Second it ensures that after the bulk 
import transaction is

Reply via email to