This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 9ec22b4b03e979ca5c238a3dd123c8a7fc1dd3db Merge: 11b1f48f03 b90ca95323 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Sun Jul 28 23:08:53 2024 +0000 Merge branch '2.1' assemble/bin/accumulo-cluster | 2 +- .../accumulo/core/clientImpl/ClientContext.java | 10 ++-- .../core/clientImpl/ConditionalWriterImpl.java | 3 +- .../core/clientImpl/InstanceOperationsImpl.java | 3 +- .../core/clientImpl/TableOperationsImpl.java | 3 +- .../core/clientImpl/TabletServerBatchReader.java | 8 ++- .../core/clientImpl/TabletServerBatchWriter.java | 8 ++- .../accumulo/core/clientImpl/bulk/BulkImport.java | 12 ++-- .../accumulo/core/file/BloomFilterLayer.java | 3 +- .../util/compaction/ExternalCompactionUtil.java | 7 ++- .../core/util/threads/ThreadPoolNames.java | 70 ++++++++++++++++++++++ .../accumulo/core/util/threads/ThreadPools.java | 60 ++++++++++++++----- .../core/file/rfile/MultiThreadedRFileTest.java | 2 +- .../threads/ThreadPoolExecutorBuilderTest.java | 14 ++--- .../conf/store/impl/PropCacheCaffeineImpl.java | 4 +- .../server/conf/store/impl/PropStoreWatcher.java | 2 +- .../accumulo/server/problems/ProblemReports.java | 2 +- .../apache/accumulo/server/rpc/TServerUtils.java | 12 ++-- .../server/util/RemoveEntriesForMissingFiles.java | 6 +- .../server/util/VerifyTabletAssignments.java | 6 +- .../server/conf/store/impl/ReadyMonitorTest.java | 2 +- .../accumulo/coordinator/CompactionFinalizer.java | 6 +- .../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +- .../tableOps/tableImport/MoveExportedFiles.java | 4 +- .../manager/upgrade/UpgradeCoordinator.java | 6 +- .../org/apache/accumulo/tserver/ScanServer.java | 5 +- .../tserver/TabletServerResourceManager.java | 60 +++++++++++-------- .../tserver/compactions/CompactionService.java | 6 +- .../compactions/InternalCompactionExecutor.java | 8 ++- .../org/apache/accumulo/tserver/log/LogSorter.java | 3 +- .../accumulo/tserver/log/TabletServerLogger.java | 3 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 4 +- .../test/functional/BatchWriterFlushIT.java | 2 +- 33 files changed, 252 insertions(+), 98 deletions(-) diff --cc assemble/bin/accumulo-cluster index 5c4c50af52,392927c4ac..aee3ad2b4b --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@@ -244,10 -252,10 +244,10 @@@ function stop_sservers() G="SSERVER_HOSTS_${group}" if [[ -n ${!G} ]]; then for sserver in ${!G}; do - stop_service "$sserver" sserver "-g" "$group" + stop_service "$sserver" sserver "-o" "sserver.group=$group" done else - echo "${queue} is not a valid resource group ...exiting" + echo "${group} is not a valid resource group ...exiting" fi done } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 654135a852,ceaf6901a5..00ebbeb363 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@@ -22,6 -21,8 +22,7 @@@ import static com.google.common.util.co import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_POOL; import java.nio.ByteBuffer; import java.util.ArrayList; @@@ -375,9 -378,9 +376,9 @@@ public class ConditionalWriterImpl impl this.config = config; this.context = context; this.auths = config.getAuthorizations(); - this.ve = new VisibilityEvaluator(config.getAuthorizations()); + this.accessEvaluator = AccessEvaluator.of(config.getAuthorizations().toAccessAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName()); + config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName); this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 215f7c6214,8c73dab8e5..eeb921d7ca --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@@ -25,8 -28,8 +25,9 @@@ import static org.apache.accumulo.core. import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.ConcurrentModificationException; diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 6b94fd81cf,c3b51237b6..91a1248c97 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@@ -27,9 -27,10 +27,10 @@@ import static java.util.concurrent.Time import static java.util.stream.Collectors.toSet; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; import java.io.BufferedReader; import java.io.FileNotFoundException; diff --cc core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index fd8d4cb6dd,0801371d48..2cdd3f213e --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@@ -19,7 -19,7 +19,8 @@@ package org.apache.accumulo.core.file; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BLOOM_LOADER_POOL; import java.io.DataInputStream; import java.io.DataOutputStream; diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index bbcc3f1529,b3e4a99a7c..c993cf4cb1 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -18,6 -18,10 +18,9 @@@ */ package org.apache.accumulo.core.util.compaction; -import static java.nio.charset.StandardCharsets.UTF_8; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index 0000000000,bdebd03b2d..a27ae864ea mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@@ -1,0 -1,78 +1,70 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.util.threads; + + public enum ThreadPoolNames { + + ACCUMULO_POOL_PREFIX("accumulo.pool"), + BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"), + BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"), + BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"), + BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"), + BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"), + BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"), - COMPACTION_COORDINATOR_SUMMARY_POOL("accumulo.pool.compaction.summary.gatherer"), + COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"), + COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"), + COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"), + CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"), + CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"), + COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), + COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), + GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), + GENERAL_SERVER_POOL("accumulo.pool.general.server"), - GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"), + IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), + INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"), - MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"), + MANAGER_FATE_POOL("accumulo.pool.manager.fate"), - MANAGER_RENAME_POOL("accumulo.pool.manager.rename"), + MANAGER_STATUS_POOL("accumulo.pool.manager.status"), + MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), + METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"), + METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"), + METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"), - REPLICATION_WORKER_POOL("accumulo.pool.replication.worker"), - SCAN_POOL("accumulo.pool.scan"), + SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"), + SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"), + SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"), + SPLIT_POOL("accumulo.pool.table.ops.add.splits"), + TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"), + TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"), - TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"), + TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"), + TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"), + TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"), + TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"), + TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"), + TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"), + TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), + TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), + TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), - TSERVER_WORKQ_POOL("accumulo.pool.tserver.workq"), + UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), + UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); + + public final String poolName; + + ThreadPoolNames(String poolName) { + this.poolName = poolName; + } + } diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 6593237ab1,b2b0bc02db..3914c8da90 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@@ -21,6 -21,23 +21,18 @@@ package org.apache.accumulo.core.util.t import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_RETRIEVAL_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Iterator; @@@ -267,11 -288,28 +279,11 @@@ public class ThreadPools final Property p, boolean emitThreadPoolMetrics) { ThreadPoolExecutorBuilder builder; switch (p) { - case GENERAL_SIMPLETIMER_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), - GENERAL_SERVER_SIMPLETIMER_POOL.poolName); case GENERAL_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), "GeneralExecutor", + return createScheduledExecutorService(conf.getCount(p), GENERAL_SERVER_POOL.poolName, emitThreadPoolMetrics); - case MANAGER_BULK_THREADPOOL_SIZE: - builder = - getPoolBuilder(MANAGER_BULK_IMPORT_POOL).numCoreThreads(conf.getCount(p)).withTimeOut( - conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); - case MANAGER_RENAME_THREADS: - builder = getPoolBuilder(MANAGER_RENAME_POOL).numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); case MANAGER_FATE_THREADPOOL_SIZE: - builder = getPoolBuilder("Repo Runner").numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(MANAGER_FATE_POOL).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } @@@ -289,9 -327,15 +301,9 @@@ builder.enableThreadPoolMetrics(); } return builder.build(); - case TSERV_WORKQ_THREADS: - builder = getPoolBuilder(TSERVER_WORKQ_POOL).numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); case TSERV_MINC_MAXCONCURRENT: - builder = getPoolBuilder("minor compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L, - MILLISECONDS); + builder = getPoolBuilder(TSERVER_MINOR_COMPACTOR_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } @@@ -332,7 -376,14 +344,7 @@@ } return builder.build(); case GC_DELETE_THREADS: - return getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build(); + return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); - case REPLICATION_WORKER_THREADS: - builder = getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); - default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 84e69d561f,5445d6d3da..a463707fec --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -19,8 -19,7 +19,9 @@@ package org.apache.accumulo.manager.upgrade; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL; +import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; +import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; import java.io.IOException; import java.util.Collections; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 9840df50cf,18cb23b37b..f684b72291 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -18,7 -18,9 +18,8 @@@ */ package org.apache.accumulo.tserver; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL; import java.io.IOException; import java.io.UncheckedIOException; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 2bb80309a1,655b54fdbd..46a96e6b5c --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -23,6 -22,18 +23,17 @@@ import static java.util.Objects.require import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL; import java.io.IOException; import java.util.ArrayList; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 2b5a6c6135,32a249e7b6..ec86d82a25 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -291,9 -294,11 +292,9 @@@ public class LogSorter } public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { - @SuppressWarnings("deprecation") - int threadPoolSize = this.conf.getCount(this.conf - .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); + int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); ThreadPoolExecutor threadPool = - ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL) .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context).startProcessing(new LogProcessor(), threadPool); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 54dad2b243,6757d276ee..544f53acaa --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -257,22 -263,60 +258,22 @@@ public class TabletServerLogger if (nextLogMaker != null) { return; } - nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog creator") + nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_CREATOR_POOL) - .numCoreThreads(1).build(); - nextLogMaker.execute(new Runnable() { - @Override - public void run() { - final ServerResources conf = tserver.getServerConfig(); - final VolumeManager fs = conf.getVolumeManager(); - while (!nextLogMaker.isShutdown()) { - log.debug("Creating next WAL"); - DfsLogger alog = null; - - try { - alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); - } catch (Exception t) { - log.error("Failed to open WAL", t); - // the log is not advertised in ZK yet, so we can just delete it if it exists - if (alog != null) { - try { - alog.close(); - } catch (Exception e) { - log.error("Failed to close WAL after it failed to open", e); - } - - try { - Path path = alog.getPath(); - if (fs.exists(path)) { - fs.delete(path); - } - } catch (Exception e) { - log.warn("Failed to delete a WAL that failed to open", e); - } - } - - try { - nextLog.offer(t, 12, TimeUnit.HOURS); - } catch (InterruptedException ex) { - // ignore - } - - continue; - } - - String fileName = alog.getFileName(); - log.debug("Created next WAL {}", fileName); - - try { - tserver.addNewLogMarker(alog); - } catch (Exception t) { - log.error("Failed to add new WAL marker for " + fileName, t); + .numCoreThreads(1).enableThreadPoolMetrics().build(); + nextLogMaker.execute(() -> { + final VolumeManager fs = tserver.getVolumeManager(); + while (!nextLogMaker.isShutdown()) { + log.debug("Creating next WAL"); + DfsLogger alog = null; + try { + alog = DfsLogger.createNew(tserver.getContext(), syncCounter, flushCounter, + tserver.getClientAddressString()); + } catch (Exception t) { + log.error("Failed to open WAL", t); + // the log is not advertised in ZK yet, so we can just delete it if it exists + if (alog != null) { try { - // Intentionally not deleting walog because it may have been advertised in ZK. See - // #949 alog.close(); } catch (Exception e) { log.error("Failed to close WAL after it failed to open", e);