This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 7b3eeb9444835cadf657219720d5bb613fb1978f Merge: 102a9744bb 9ec22b4b03 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Sun Jul 28 23:48:50 2024 +0000 Merge branch 'main' into elasticity .../accumulo/core/clientImpl/ClientContext.java | 10 +-- .../core/clientImpl/ConditionalWriterImpl.java | 3 +- .../core/clientImpl/InstanceOperationsImpl.java | 3 +- .../core/clientImpl/TableOperationsImpl.java | 6 +- .../core/clientImpl/TabletServerBatchReader.java | 8 ++- .../core/clientImpl/TabletServerBatchWriter.java | 8 ++- .../accumulo/core/clientImpl/bulk/BulkImport.java | 12 ++-- .../accumulo/core/file/BloomFilterLayer.java | 3 +- .../util/compaction/ExternalCompactionUtil.java | 7 ++- .../core/util/threads/ThreadPoolNames.java | 71 ++++++++++++++++++++++ .../accumulo/core/util/threads/ThreadPools.java | 60 +++++++++++++----- .../core/file/rfile/MultiThreadedRFileTest.java | 2 +- .../threads/ThreadPoolExecutorBuilderTest.java | 14 ++--- .../conf/store/impl/PropCacheCaffeineImpl.java | 4 +- .../server/conf/store/impl/PropStoreWatcher.java | 2 +- .../accumulo/server/problems/ProblemReports.java | 2 +- .../apache/accumulo/server/rpc/TServerUtils.java | 12 ++-- .../server/util/RemoveEntriesForMissingFiles.java | 6 +- .../server/util/VerifyTabletAssignments.java | 6 +- .../server/conf/store/impl/ReadyMonitorTest.java | 2 +- .../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +- .../tableOps/tableImport/MoveExportedFiles.java | 4 +- .../manager/upgrade/UpgradeCoordinator.java | 6 +- .../org/apache/accumulo/tserver/ScanServer.java | 5 +- .../tserver/TabletServerResourceManager.java | 43 ++++++++----- .../org/apache/accumulo/tserver/log/LogSorter.java | 3 +- .../accumulo/tserver/log/TabletServerLogger.java | 3 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 4 +- .../test/functional/BatchWriterFlushIT.java | 2 +- 29 files changed, 231 insertions(+), 84 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 07ff915c95,62efab71f5..80282318f4 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@@ -24,6 -24,9 +24,8 @@@ import static com.google.common.base.Su import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCANNER_READ_AHEAD_POOL; import java.lang.Thread.UncaughtExceptionHandler; import java.net.URL; diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 610e5865fc,00ebbeb363..a4445d48d7 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@@ -377,8 -378,8 +378,8 @@@ public class ConditionalWriterImpl impl this.auths = config.getAuthorizations(); this.accessEvaluator = AccessEvaluator.of(config.getAuthorizations().toAccessAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName()); + config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName); - this.locator = new SyncingTabletLocator(context, tableId); + this.locator = new SyncingClientTabletCache(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; this.tableName = tableName; diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 43b108e1d1,91a1248c97..063c96c238 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@@ -40,7 -30,7 +40,9 @@@ import static org.apache.accumulo.core. import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; +import static org.apache.accumulo.core.util.Validators.NOT_BUILTIN_TABLE; ++import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_START_POOL; ++import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_WAIT_POOL; import java.io.BufferedReader; import java.io.FileNotFoundException; @@@ -466,161 -423,192 +468,161 @@@ public class TableOperationsImpl extend } } - private static class SplitEnv { - private final String tableName; - private final TableId tableId; - private final ExecutorService executor; - private final CountDownLatch latch; - private final AtomicReference<Exception> exception; + /** + * On the server side the fate operation will exit w/o an error if the tablet requested to split + * does not exist. When this happens it will also return an empty string. In the case where the + * fate operation successfully splits the tablet it will return the following string. This code + * uses this return value to see if it needs to retry finding the tablet. + */ + public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED"; - SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch, - AtomicReference<Exception> exception) { - this.tableName = tableName; - this.tableId = tableId; - this.executor = executor; - this.latch = latch; - this.exception = exception; - } - } + @Override + public void addSplits(String tableName, SortedSet<Text> splits) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { - private class SplitTask implements Runnable { + EXISTING_TABLE_NAME.validate(tableName); - private List<Text> splits; - private SplitEnv env; + TableId tableId = context.getTableId(tableName); - SplitTask(SplitEnv env, List<Text> splits) { - this.env = env; - this.splits = splits; - } + // TODO should there be a server side check for this? + context.requireNotOffline(tableId, tableName); - @Override - public void run() { - try { - if (env.exception.get() != null) { - return; - } + ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId); - if (splits.size() <= 2) { - addSplits(env, new TreeSet<>(splits)); - splits.forEach(s -> env.latch.countDown()); - return; - } + SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new TreeSet<>(splits)); - int mid = splits.size() / 2; + final ByteBuffer EMPTY = ByteBuffer.allocate(0); - // split the middle split point to ensure that child task split - // different tablets and can therefore run in parallel - addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1))); - env.latch.countDown(); + ExecutorService startExecutor = - context.threadPools().getPoolBuilder("addSplitsStart").numCoreThreads(16).build(); ++ context.threadPools().getPoolBuilder(SPLIT_START_POOL).numCoreThreads(16).build(); + ExecutorService waitExecutor = - context.threadPools().getPoolBuilder("addSplitsWait").numCoreThreads(16).build(); ++ context.threadPools().getPoolBuilder(SPLIT_WAIT_POOL).numCoreThreads(16).build(); - env.executor.execute(new SplitTask(env, splits.subList(0, mid))); - env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size()))); + while (!splitsTodo.isEmpty()) { - } catch (Exception t) { - env.exception.compareAndSet(null, t); - } - } + tabLocator.invalidateCache(); - } + Map<KeyExtent,List<Text>> tabletSplits = + mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo); - @Override - public void addSplits(String tableName, SortedSet<Text> partitionKeys) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - EXISTING_TABLE_NAME.validate(tableName); + List<CompletableFuture<Void>> futures = new ArrayList<>(); - TableId tableId = context.getTableId(tableName); - List<Text> splits = new ArrayList<>(partitionKeys); + // begin the fate operation for each tablet without waiting for the operation to complete + for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) { + CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { + var extent = splitsForTablet.getKey(); - // should be sorted because we copied from a sorted set, but that makes - // assumptions about how the copy was done so resort to be sure. - Collections.sort(splits); - CountDownLatch latch = new CountDownLatch(splits.size()); - AtomicReference<Exception> exception = new AtomicReference<>(null); + List<ByteBuffer> args = new ArrayList<>(); + args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8))); + args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow())); + args.add( + extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow())); + splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split))); - ExecutorService executor = - context.threadPools().getPoolBuilder(SPLIT_POOL).numCoreThreads(16).build(); - try { - executor.execute( - new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); - - while (!latch.await(100, MILLISECONDS)) { - if (exception.get() != null) { - executor.shutdownNow(); - Throwable excep = exception.get(); - // Below all exceptions are wrapped and rethrown. This is done so that the user knows what - // code path got them here. If the wrapping was not done, the - // user would only have the stack trace for the background thread. - if (excep instanceof TableNotFoundException) { - TableNotFoundException tnfe = (TableNotFoundException) excep; - throw new TableNotFoundException(tableId.canonical(), tableName, - "Table not found by background thread", tnfe); - } else if (excep instanceof TableOfflineException) { - log.debug("TableOfflineException occurred in background thread. Throwing new exception", - excep); - throw new TableOfflineException(tableId, tableName); - } else if (excep instanceof AccumuloSecurityException) { - // base == background accumulo security exception - AccumuloSecurityException base = (AccumuloSecurityException) excep; - throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(), - base.getTableInfo(), excep); - } else if (excep instanceof AccumuloServerException) { - throw new AccumuloServerException((AccumuloServerException) excep); - } else if (excep instanceof Error) { - throw new Error(excep); - } else { - throw new AccumuloException(excep); + try { + return handleFateOperation(() -> { + TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift(); + TFateId opid = beginFateOperation(t); + executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false); + return new Pair<>(opid, splitsForTablet.getValue()); + }, tableName); + } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException + | AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + // This exception type is used because it makes it easier in the foreground thread to do + // exception analysis when using CompletableFuture. + throw new CompletionException(e); } - } - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } finally { - executor.shutdown(); - } - } - - private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException, AccumuloServerException { + // wait for the fate operation to complete in a separate thread pool + }, startExecutor).thenApplyAsync(pair -> { + final TFateId opid = pair.getFirst(); + final List<Text> completedSplits = pair.getSecond(); - TabletLocator tabLocator = TabletLocator.getLocator(context, env.tableId); - for (Text split : partitionKeys) { - boolean successful = false; - int attempt = 0; - long locationFailures = 0; + try { + String status = handleFateOperation(() -> waitForFateOperation(opid), tableName); - while (!successful) { + if (SPLIT_SUCCESS_MSG.equals(status)) { + completedSplits.forEach(splitsTodo::remove); + } + } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException + | AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + // This exception type is used because it makes it easier in the foreground thread to do + // exception analysis when using CompletableFuture. + throw new CompletionException(e); + } finally { + // always finish table op, even when exception + if (opid != null) { + try { + finishFateOperation(opid); + } catch (Exception e) { + log.warn("Exception thrown while finishing fate table operation", e); + } + } + } + return null; + }, waitExecutor); + futures.add(future); + } - if (attempt > 0) { - sleepUninterruptibly(100, MILLISECONDS); + try { + futures.forEach(CompletableFuture::join); + } catch (CompletionException ee) { + Throwable excep = ee.getCause(); + // Below all exceptions are wrapped and rethrown. This is done so that the user knows + // what code path got them here. If the wrapping was not done, the user would only + // have the stack trace for the background thread. + if (excep instanceof TableNotFoundException) { + TableNotFoundException tnfe = (TableNotFoundException) excep; + throw new TableNotFoundException(tableId.canonical(), tableName, + "Table not found by background thread", tnfe); + } else if (excep instanceof TableOfflineException) { + log.debug("TableOfflineException occurred in background thread. Throwing new exception", + excep); + throw new TableOfflineException(tableId, tableName); + } else if (excep instanceof AccumuloSecurityException) { + // base == background accumulo security exception + AccumuloSecurityException base = (AccumuloSecurityException) excep; + throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(), + base.getTableInfo(), excep); + } else if (excep instanceof AccumuloServerException) { + throw new AccumuloServerException((AccumuloServerException) excep); + } else { + throw new AccumuloException(excep); } + } - attempt++; + } + startExecutor.shutdown(); + waitExecutor.shutdown(); + } - TabletLocation tl = tabLocator.locateTablet(context, split, false, false); + private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId, + ClientTabletCache tabLocator, SortedSet<Text> splitsTodo) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>(); - if (tl == null) { - context.requireTableExists(env.tableId, env.tableName); - context.requireNotOffline(env.tableId, env.tableName); - continue; - } + var iterator = splitsTodo.iterator(); + while (iterator.hasNext()) { + var split = iterator.next(); - HostAndPort address = HostAndPort.fromString(tl.getTserverLocation()); + try { + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); - try { - TabletManagementClientService.Client client = - ThriftUtil.getClient(ThriftClientTypes.TABLET_MGMT, address, context); + var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED); + while (tablet == null) { + context.requireTableExists(tableId, tableName); try { - - OpTimer timer = null; - - if (log.isTraceEnabled()) { - log.trace("tid={} Splitting tablet {} on {} at {}", Thread.currentThread().getId(), - tl.getExtent(), address, split); - timer = new OpTimer().start(); - } - - client.splitTablet(TraceUtil.traceInfo(), context.rpcCreds(), tl.getExtent().toThrift(), - TextUtil.getByteBuffer(split)); - - // just split it, might as well invalidate it in the cache - tabLocator.invalidateCache(tl.getExtent()); - - if (timer != null) { - timer.stop(); - log.trace("Split tablet in {}", String.format("%.3f secs", timer.scale(SECONDS))); - } - - } finally { - ThriftUtil.returnClient(client, context); - } - - } catch (TApplicationException tae) { - throw new AccumuloServerException(address.toString(), tae); - } catch (ThriftSecurityException e) { - context.clearTableListCache(); - context.requireTableExists(env.tableId, env.tableName); - throw new AccumuloSecurityException(e.user, e.code, e); - } catch (NotServingTabletException e) { - // Do not silently spin when we repeatedly fail to get the location for a tablet - locationFailures++; - if (locationFailures == 5 || locationFailures % 50 == 0) { - log.warn("Having difficulty locating hosting tabletserver for split {} on table {}." - + " Seen {} failures.", split, env.tableName, locationFailures); + retry.waitForNextAttempt(log, "Find tablet in " + tableId + " containing " + split); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED); + } - tabLocator.invalidateCache(tl.getExtent()); - continue; - } catch (TException e) { - tabLocator.invalidateCache(context, tl.getTserverLocation()); + if (split.equals(tablet.getExtent().endRow())) { + // split already exists, so remove it + iterator.remove(); continue; } diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index b52ed9f867,c993cf4cb1..5aa91afaa1 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -221,11 -224,10 +224,11 @@@ public class ExternalCompactionUtil public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) { final List<RunningCompactionFuture> rcFutures = new ArrayList<>(); final ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); + .getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build(); - getCompactorAddrs(context).forEach((q, hp) -> { + + getCompactorAddrs(context).forEach((group, hp) -> { hp.forEach(hostAndPort -> { - rcFutures.add(new RunningCompactionFuture(q, hostAndPort, + rcFutures.add(new RunningCompactionFuture(group, hostAndPort, executor.submit(() -> getRunningCompaction(hostAndPort, context)))); }); }); diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index 0000000000,a27ae864ea..c000defb00 mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@@ -1,0 -1,70 +1,71 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.util.threads; + + public enum ThreadPoolNames { + + ACCUMULO_POOL_PREFIX("accumulo.pool"), + BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"), + BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"), + BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"), + BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"), + BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"), + BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"), + COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"), + COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"), + COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"), + CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"), + CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"), + COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), + COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), + GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), + GENERAL_SERVER_POOL("accumulo.pool.general.server"), + IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), + INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"), + MANAGER_FATE_POOL("accumulo.pool.manager.fate"), + MANAGER_STATUS_POOL("accumulo.pool.manager.status"), + MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), + METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"), + METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"), + METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"), + SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"), + SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"), + SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"), - SPLIT_POOL("accumulo.pool.table.ops.add.splits"), ++ SPLIT_START_POOL("accumulo.pool.table.ops.add.splits.start"), ++ SPLIT_WAIT_POOL("accumulo.pool.table.ops.add.splits.wait"), + TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"), + TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"), + TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"), + TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"), + TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"), + TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"), + TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"), + TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"), + TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), + TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), + TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), + UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), + UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); + + public final String poolName; + + ThreadPoolNames(String poolName) { + this.poolName = poolName; + } + } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index 3b5b2fbbee,765065d2b8..cad6ff2b09 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@@ -105,7 -117,7 +107,7 @@@ class BulkImportMove extends ManagerRep oldToNewMap.put(originalPath, newPath); } try { - fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId); - fs.bulkRename(oldToNewMap, workerCount, BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid); ++ fs.bulkRename(oldToNewMap, workerCount, BULK_IMPORT_DIR_MOVE_POOL.poolName, fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index d6a328405a,1bc30fc73c..12adab912a --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@@ -110,7 -114,7 +112,7 @@@ class MoveExportedFiles extends Manager } } try { - fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId); - fs.bulkRename(oldToNewPaths, workerCount, IMPORT_TABLE_RENAME_POOL.poolName, fmtTid); ++ fs.bulkRename(oldToNewPaths, workerCount, IMPORT_TABLE_RENAME_POOL.poolName, fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 2f4f7dd2ff,a463707fec..14d5fa0dfe --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -19,7 -19,7 +19,8 @@@ package org.apache.accumulo.manager.upgrade; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL; +import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 2edb357988,f684b72291..c8516d103d --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -253,12 -257,10 +254,12 @@@ public class ScanServer extends Abstrac "Tablet metadata cache refresh percentage is '%s' but must be less than 1", cacheRefreshPercentage); - tmCacheExecutor = context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8) - .enableThreadPoolMetrics().build(); + tmCacheExecutor = context.threadPools().getPoolBuilder(SCAN_SERVER_TABLET_METADATA_CACHE_POOL) + .numCoreThreads(8).enableThreadPoolMetrics().build(); - var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) - .scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats(); + var builder = + context.getCaches().createNewBuilder(CacheName.SCAN_SERVER_TABLET_METADATA, true) + .expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) + .scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats(); if (cacheRefreshPercentage > 0) { // Compute the refresh time as a percentage of the expiration time // Cache hits after this time, but before expiration, will trigger a background diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index cfa95cd269,46a96e6b5c..1ee36fac51 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -23,6 -23,17 +23,15 @@@ import static java.util.Objects.require import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL; import java.io.IOException; import java.util.ArrayList; @@@ -301,14 -316,21 +312,14 @@@ public class TabletServerResourceManage } minorCompactionThreadPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_MINC_MAXCONCURRENT, true); + Property.TSERV_MINC_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), - "minor compactor", minorCompactionThreadPool); + TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool); - splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder(SPLIT_POOL) - .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS).build(); - - defaultSplitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_DEFAULT_SPLIT_POOL) - .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); - defaultMigrationPool = - ThreadPools.getServerThreadPools().getPoolBuilder("metadata tablet migration") + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL) - .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); + .numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, SECONDS).build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 030720c93d,ec86d82a25..1fdae2890e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -291,31 -291,13 +292,31 @@@ public class LogSorter } } - public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { - int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); + /** + * Sort any logs that need sorting in the current thread. + * + * @return The time in millis when the next check can be done. + */ + public long sortLogsIfNeeded() throws KeeperException, InterruptedException { + DistributedWorkQueue dwq = new DistributedWorkQueue( + context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context); + dwq.processExistingWork(new LogProcessor(), MoreExecutors.newDirectExecutorService(), 1, false); + return System.currentTimeMillis() + dwq.getCheckInterval(); + } + + /** + * Sort any logs that need sorting in a ThreadPool using + * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will start a background + * thread to look for log sorting work in the future that will be processed by the + * ThreadPoolExecutor + */ + public void startWatchingForRecoveryLogs(int threadPoolSize) + throws KeeperException, InterruptedException { ThreadPoolExecutor threadPool = - ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL) .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, - context).startProcessing(new LogProcessor(), threadPool); + context).processExistingAndFuture(new LogProcessor(), threadPool); } public List<RecoveryStatus> getLogSorts() {