This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 2f4b0372512b3cf677e7a66d4b5ee8482a36a0da Merge: ce83488245 0ad96b1dc0 Author: Ed Coleman <edcole...@apache.org> AuthorDate: Tue Mar 19 20:31:36 2024 +0000 Merge remote-tracking branch 'upstream/main' into elasticity .../accumulo/core/clientImpl/ClientContext.java | 9 +- .../core/clientImpl/ConditionalWriterImpl.java | 2 +- .../core/clientImpl/InstanceOperationsImpl.java | 4 +- .../core/clientImpl/TableOperationsImpl.java | 4 +- .../core/clientImpl/TabletServerBatchReader.java | 5 +- .../core/clientImpl/TabletServerBatchWriter.java | 8 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 8 +- .../accumulo/core/file/BloomFilterLayer.java | 5 +- .../file/blockfile/cache/lru/LruBlockCache.java | 4 +- .../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 2 +- .../util/compaction/ExternalCompactionUtil.java | 9 +- .../accumulo/core/util/threads/ThreadPools.java | 379 +++++++++++++-------- .../core/file/rfile/MultiThreadedRFileTest.java | 7 +- .../threads/ThreadPoolExecutorBuilderTest.java | 79 +++++ .../server/conf/ServerConfigurationFactory.java | 4 +- .../conf/store/impl/PropCacheCaffeineImpl.java | 7 +- .../server/conf/store/impl/PropStoreWatcher.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 4 +- .../accumulo/server/problems/ProblemReports.java | 7 +- .../apache/accumulo/server/rpc/TServerUtils.java | 7 +- .../server/util/RemoveEntriesForMissingFiles.java | 4 +- .../server/util/VerifyTabletAssignments.java | 5 +- .../server/conf/store/impl/ReadyMonitorTest.java | 4 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 2 +- .../compaction/coordinator/QueueMetrics.java | 4 +- .../accumulo/manager/metrics/fate/FateMetrics.java | 4 +- .../accumulo/manager/recovery/RecoveryManager.java | 4 +- .../apache/accumulo/manager/split/Splitter.java | 5 +- .../manager/tableOps/bulkVer2/TabletRefresher.java | 2 +- .../manager/upgrade/UpgradeCoordinator.java | 8 +- .../tserver/TabletServerResourceManager.java | 21 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 5 +- .../accumulo/tserver/log/TabletServerLogger.java | 4 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 5 +- .../test/functional/BatchWriterFlushIT.java | 2 +- 35 files changed, 409 insertions(+), 229 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index c009e14b14,02aeef6bf6..bf650a077f --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@@ -372,8 -372,8 +372,8 @@@ public class ConditionalWriterImpl impl this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName(), false); + config.getMaxWriteThreads(), this.getClass().getSimpleName()); - this.locator = new SyncingTabletLocator(context, tableId); + this.locator = new SyncingClientTabletCache(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; this.tableName = tableName; diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 297ed70495,6b94fd81cf..2ea43612f1 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@@ -448,74 -422,109 +448,76 @@@ public class TableOperationsImpl extend } } - private static class SplitEnv { - private final String tableName; - private final TableId tableId; - private final ExecutorService executor; - private final CountDownLatch latch; - private final AtomicReference<Exception> exception; - - SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch, - AtomicReference<Exception> exception) { - this.tableName = tableName; - this.tableId = tableId; - this.executor = executor; - this.latch = latch; - this.exception = exception; - } - } - - private class SplitTask implements Runnable { - - private List<Text> splits; - private SplitEnv env; + /** + * On the server side the fate operation will exit w/o an error if the tablet requested to split + * does not exist. When this happens it will also return an empty string. In the case where the + * fate operation successfully splits the tablet it will return the following string. This code + * uses this return value to see if it needs to retry finding the tablet. + */ + public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED"; - SplitTask(SplitEnv env, List<Text> splits) { - this.env = env; - this.splits = splits; - } + @Override + public void addSplits(String tableName, SortedSet<Text> splits) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { - @Override - public void run() { - try { - if (env.exception.get() != null) { - return; - } + EXISTING_TABLE_NAME.validate(tableName); - if (splits.size() <= 2) { - addSplits(env, new TreeSet<>(splits)); - splits.forEach(s -> env.latch.countDown()); - return; - } + TableId tableId = context.getTableId(tableName); - int mid = splits.size() / 2; + // TODO should there be a server side check for this? + context.requireNotOffline(tableId, tableName); - // split the middle split point to ensure that child task split - // different tablets and can therefore run in parallel - addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1))); - env.latch.countDown(); + ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId); - env.executor.execute(new SplitTask(env, splits.subList(0, mid))); - env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size()))); + SortedSet<Text> splitsTodo = new TreeSet<>(splits); - ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false); ++ ExecutorService executor = ++ context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build(); + - } catch (Exception t) { - env.exception.compareAndSet(null, t); - } - } + try { + while (!splitsTodo.isEmpty()) { - } + tabLocator.invalidateCache(); - @Override - public void addSplits(String tableName, SortedSet<Text> partitionKeys) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - EXISTING_TABLE_NAME.validate(tableName); + Map<KeyExtent,List<Text>> tabletSplits = + mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo); - TableId tableId = context.getTableId(tableName); - List<Text> splits = new ArrayList<>(partitionKeys); + List<Future<List<Text>>> splitTasks = new ArrayList<>(); - // should be sorted because we copied from a sorted set, but that makes - // assumptions about how the copy was done so resort to be sure. - Collections.sort(splits); - CountDownLatch latch = new CountDownLatch(splits.size()); - AtomicReference<Exception> exception = new AtomicReference<>(null); + for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) { + Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet); + splitTasks.add(executor.submit(splitTask)); + } - ExecutorService executor = - context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build(); - try { - executor.execute( - new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); - - while (!latch.await(100, MILLISECONDS)) { - if (exception.get() != null) { - executor.shutdownNow(); - Throwable excep = exception.get(); - // Below all exceptions are wrapped and rethrown. This is done so that the user knows what - // code path got them here. If the wrapping was not done, the - // user would only have the stack trace for the background thread. - if (excep instanceof TableNotFoundException) { - TableNotFoundException tnfe = (TableNotFoundException) excep; - throw new TableNotFoundException(tableId.canonical(), tableName, - "Table not found by background thread", tnfe); - } else if (excep instanceof TableOfflineException) { - log.debug("TableOfflineException occurred in background thread. Throwing new exception", - excep); - throw new TableOfflineException(tableId, tableName); - } else if (excep instanceof AccumuloSecurityException) { - // base == background accumulo security exception - AccumuloSecurityException base = (AccumuloSecurityException) excep; - throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(), - base.getTableInfo(), excep); - } else if (excep instanceof AccumuloServerException) { - throw new AccumuloServerException((AccumuloServerException) excep); - } else if (excep instanceof Error) { - throw new Error(excep); - } else { - throw new AccumuloException(excep); + for (var future : splitTasks) { + try { + var completedSplits = future.get(); + completedSplits.forEach(splitsTodo::remove); + } catch (ExecutionException ee) { + Throwable excep = ee.getCause(); + // Below all exceptions are wrapped and rethrown. This is done so that the user knows + // what code path got them here. If the wrapping was not done, the user would only + // have the stack trace for the background thread. + if (excep instanceof TableNotFoundException) { + TableNotFoundException tnfe = (TableNotFoundException) excep; + throw new TableNotFoundException(tableId.canonical(), tableName, + "Table not found by background thread", tnfe); + } else if (excep instanceof TableOfflineException) { + log.debug( + "TableOfflineException occurred in background thread. Throwing new exception", + excep); + throw new TableOfflineException(tableId, tableName); + } else if (excep instanceof AccumuloSecurityException) { + // base == background accumulo security exception + AccumuloSecurityException base = (AccumuloSecurityException) excep; + throw new AccumuloSecurityException(base.getUser(), + base.asThriftException().getCode(), base.getTableInfo(), excep); + } else if (excep instanceof AccumuloServerException) { + throw new AccumuloServerException((AccumuloServerException) excep); + } else { + throw new AccumuloException(excep); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } } } diff --cc core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java index 3656b99815,46a07682bd..48bf8cc999 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@@ -63,15 -62,15 +63,15 @@@ public final class TinyLfuBlockCache im private final Policy.Eviction<String,Block> policy; private final int maxSize; private final ScheduledExecutorService statsExecutor = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", false); + .createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor"); public TinyLfuBlockCache(Configuration conf, CacheType type) { - cache = Caffeine.newBuilder() + cache = Caches.getInstance().createNewBuilder(CacheName.TINYLFU_BLOCK_CACHE, false) .initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize(type) / conf.getBlockSize())) - .weigher((String blockName, Block block) -> { + .recordStats().weigher((String blockName, Block block) -> { int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; return keyWeight + block.weight(); - }).maximumWeight(conf.getMaxSize(type)).recordStats().build(); + }).maximumWeight(conf.getMaxSize(type)).build(); policy = cache.policy().eviction().orElseThrow(); maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum()); ScheduledFuture<?> future = statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 0d42f659c8,35c358b7ed..86181247a8 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -218,12 -218,11 +218,12 @@@ public class ExternalCompactionUtil */ public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) { final List<RunningCompactionFuture> rcFutures = new ArrayList<>(); - final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16, - "CompactorRunningCompactions", false); + final ExecutorService executor = ThreadPools.getServerThreadPools() + .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); - getCompactorAddrs(context).forEach((q, hp) -> { + + getCompactorAddrs(context).forEach((group, hp) -> { hp.forEach(hostAndPort -> { - rcFutures.add(new RunningCompactionFuture(q, hostAndPort, + rcFutures.add(new RunningCompactionFuture(group, hostAndPort, executor.submit(() -> getRunningCompaction(hostAndPort, context)))); }); }); diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index ad5aa4710b,334fb46a53..d520c01d87 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@@ -251,35 -269,74 +269,68 @@@ public class ThreadPools return createScheduledExecutorService(conf.getCount(p), "GeneralExecutor", emitThreadPoolMetrics); case MANAGER_FATE_THREADPOOL_SIZE: - return createFixedThreadPool(conf.getCount(p), "Repo Runner", emitThreadPoolMetrics); + builder = getPoolBuilder("Repo Runner").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case MANAGER_STATUS_THREAD_POOL_SIZE: + builder = getPoolBuilder("GatherTableInformation"); int threads = conf.getCount(p); if (threads == 0) { - return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, "GatherTableInformation", - new SynchronousQueue<>(), emitThreadPoolMetrics); + builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + .withQueue(new SynchronousQueue<>()); } else { - return createFixedThreadPool(threads, "GatherTableInformation", emitThreadPoolMetrics); + builder.numCoreThreads(threads); } + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); - case TSERV_WORKQ_THREADS: - builder = getPoolBuilder("distributed work queue").numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); case TSERV_MINC_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "minor compactor", - emitThreadPoolMetrics); + builder = getPoolBuilder("minor compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_MIGRATE_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet migration", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet migration").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_ASSIGNMENT_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet assignment", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet assignment").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_RETRIEVAL_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary file retriever", emitThreadPoolMetrics); + builder = getPoolBuilder("summary file retriever").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_REMOTE_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, "summary remote", - emitThreadPoolMetrics); + builder = getPoolBuilder("summary remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_PARTITION_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary partition", emitThreadPoolMetrics); + builder = getPoolBuilder("summary partition").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: - return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics); + return getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build(); default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 7607d5ceb9,40cdb5811f..f56e7f1622 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@@ -324,10 -321,10 +324,10 @@@ public class VolumeManagerImpl implemen @Override public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException { + FateId fateId) throws IOException { List<Future<Void>> results = new ArrayList<>(); - ExecutorService workerPool = - ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, poolName, false); + ExecutorService workerPool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName) + .numCoreThreads(poolSize).build(); oldToNewPathMap.forEach((oldPath, newPath) -> results.add(workerPool.submit(() -> { boolean success; try { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java index c5ba5157dc,0000000000..425bdf4937 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java @@@ -1,155 -1,0 +1,155 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + +import static org.apache.accumulo.core.metrics.MetricsUtil.formatString; +import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; + +public class QueueMetrics implements MetricsProducer { + + private static class QueueMeters { + private final Gauge length; + private final Gauge jobsQueued; + private final Gauge jobsDequeued; + private final Gauge jobsRejected; + private final Gauge jobsLowestPriority; + + public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid, + CompactionJobPriorityQueue queue) { + var queueId = formatString(cgid.canonical()); + + length = + Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, queue, q -> q.getMaxSize()) + .description("Length of priority queues") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + jobsQueued = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, queue, q -> q.getQueuedJobs()) + .description("Count of queued jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + jobsDequeued = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, queue, + q -> q.getDequeuedJobs()) + .description("Count of jobs dequeued") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + jobsRejected = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, queue, + q -> q.getRejectedJobs()) + .description("Count of rejected jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + jobsLowestPriority = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, queue, + q -> q.getLowestPriority()) + .description("Lowest priority queued job") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + } + + private void removeMeters(MeterRegistry registry) { + registry.remove(length); + registry.remove(jobsQueued); + registry.remove(jobsDequeued); + registry.remove(jobsRejected); + registry.remove(jobsLowestPriority); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); + private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(5); + private volatile MeterRegistry meterRegistry = null; + private final CompactionJobQueues compactionJobQueues; + private final Map<CompactorGroupId,QueueMeters> perQueueMetrics = new HashMap<>(); + private Gauge queueCountMeter = null; + + public QueueMetrics(CompactionJobQueues compactionJobQueues) { + this.compactionJobQueues = compactionJobQueues; - ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "queueMetricsPoller", false); ++ ScheduledExecutorService scheduler = ++ ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "queueMetricsPoller"); + Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); + ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update, + DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY, TimeUnit.MILLISECONDS)); + } + + public void update() { + + // read the volatile variable once so the rest of the method has consistent view + var localRegistry = meterRegistry; + + if (queueCountMeter == null) { + queueCountMeter = Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues, + CompactionJobQueues::getQueueCount) + .description("Number of current Queues").tags(getCommonTags()).register(localRegistry); + } + LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds()); + + Set<CompactorGroupId> definedQueues = compactionJobQueues.getQueueIds(); + LOG.debug("update - defined queues: {}", definedQueues); + + // Copy the keySet into a new Set so that changes to perQueueMetrics + // don't affect the collection + Set<CompactorGroupId> queuesWithMetrics = new HashSet<>(perQueueMetrics.keySet()); + LOG.debug("update - queues with metrics: {}", queuesWithMetrics); + + SetView<CompactorGroupId> queuesWithoutMetrics = + Sets.difference(definedQueues, queuesWithMetrics); + queuesWithoutMetrics.forEach(q -> { + LOG.debug("update - creating meters for queue: {}", q); + perQueueMetrics.put(q, new QueueMeters(localRegistry, q, compactionJobQueues.getQueue(q))); + }); + + SetView<CompactorGroupId> metricsWithoutQueues = + Sets.difference(queuesWithMetrics, definedQueues); + metricsWithoutQueues.forEach(q -> { + LOG.debug("update - removing meters for queue: {}", q); + perQueueMetrics.get(q).removeMeters(localRegistry); + perQueueMetrics.remove(q); + }); + + } + + @Override + public void registerMetrics(MeterRegistry registry) { + this.meterRegistry = registry; + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 69d485dd91,cb590a3642..d0eeb9b58d --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -69,13 -69,12 +69,13 @@@ public class RecoveryManager public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { this.manager = manager; - existenceCache = - Caffeine.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) - .maximumWeight(10_000_000).weigher((path, exist) -> path.toString().length()).build(); + existenceCache = this.manager.getContext().getCaches() + .createNewBuilder(CacheName.RECOVERY_MANAGER_PATH_CACHE, true) + .expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) + .maximumWeight(10_000_000).weigher((path, exist) -> path.toString().length()).build(); - executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, - "Walog sort starter", false); + executor = + ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); zooCache = new ZooCache(manager.getContext().getZooReader(), null); try { List<String> workIDs = diff --cc server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 3acdfe13bf,0000000000..d8e0a87a40 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@@ -1,120 -1,0 +1,121 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.split; + +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.util.FileUtil; +import org.apache.accumulo.server.util.FileUtil.FileInfo; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; + +public class Splitter { + + private final ThreadPoolExecutor splitExecutor; + + private static class CacheKey { + + final TableId tableId; + final TabletFile tabletFile; + + public CacheKey(TableId tableId, TabletFile tabletFile) { + this.tableId = tableId; + this.tabletFile = tabletFile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(tableId, cacheKey.tableId) + && Objects.equals(tabletFile, cacheKey.tabletFile); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, tabletFile); + } + + } + + LoadingCache<CacheKey,FileInfo> splitFileCache; + + public Splitter(ServerContext context) { + int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + // Set up thread pool that constrains the amount of task it queues and when full discards task. + // The purpose of this is to avoid reading lots of data into memory if lots of tablets need to + // split. + BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000); - this.splitExecutor = context.threadPools().createThreadPool(numThreads, numThreads, 0, - TimeUnit.MILLISECONDS, "split_seeder", queue, true); ++ this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder") ++ .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, TimeUnit.MILLISECONDS) ++ .withQueue(queue).enableThreadPoolMetrics().build(); + + // Discard task when the queue is full, this allows the TGW to continue processing task other + // than splits. + this.splitExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + + Weigher<CacheKey, + FileInfo> weigher = (key, info) -> key.tableId.canonical().length() + + key.tabletFile.getPath().toString().length() + info.getFirstRow().getLength() + + info.getLastRow().getLength(); + + CacheLoader<CacheKey,FileInfo> loader = key -> { + TableConfiguration tableConf = context.getTableConfiguration(key.tableId); + return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) + .get(key.tabletFile); + }; + + splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true) + .expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher) + .build(loader); + + } + + public synchronized void start() {} + + public synchronized void stop() { + splitExecutor.shutdownNow(); + } + + public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) { + return splitFileCache.get(new CacheKey(tableId, tabletFile)); + } + + public void initiateSplit(SeedSplitTask seedSplitTask) { + splitExecutor.execute(seedSplitTask); + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index d2bc8c9b91,0000000000..a5cdbe847f mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@@ -1,190 -1,0 +1,190 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.bulkVer2; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.server.ServerContext; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +public class TabletRefresher { + + private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); + + public static void refresh(ServerContext context, + Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, TableId tableId, + byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) { + + // ELASTICITY_TODO should this thread pool be configurable? + ThreadPoolExecutor threadPool = - context.threadPools().createFixedThreadPool(10, "Tablet refresh " + fateId, false); ++ context.threadPools().getPoolBuilder("Tablet refresh " + fateId).numCoreThreads(10).build(); + + try (var tablets = context.getAmple().readTablets().forTable(tableId) + .overlapping(startRow, endRow).checkConsistency() + .fetch(ColumnType.LOADED, ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { + + // Find all tablets that need to refresh their metadata. There may be some tablets that were + // hosted after the tablet files were updated, it just results in an unneeded refresh + // request. There may also be tablets that had a location when the files were set but do not + // have a location now, that is ok the next time that tablet loads somewhere it will see the + // files. + + var tabletIterator = + tablets.stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null) + .filter(needsRefresh).iterator(); + + // avoid reading all tablets into memory and instead process batches of 1000 tablets at a time + Iterators.partition(tabletIterator, 1000).forEachRemaining(batch -> { + var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, + mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); + + refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, + refreshesNeeded); + }); + + } finally { + threadPool.shutdownNow(); + } + + } + + public static void refreshTablets(ExecutorService threadPool, String logId, ServerContext context, + Supplier<Set<TServerInstance>> onlineTserversSupplier, + Map<TabletMetadata.Location,List<TKeyExtent>> refreshesNeeded) { + + // make a copy as it will be mutated in this method + refreshesNeeded = new HashMap<>(refreshesNeeded); + + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (!refreshesNeeded.isEmpty()) { + + Map<TabletMetadata.Location,Future<List<TKeyExtent>>> futures = new HashMap<>(); + + for (Map.Entry<TabletMetadata.Location,List<TKeyExtent>> entry : refreshesNeeded.entrySet()) { + + // Ask tablet server to reload the metadata for these tablets. The tablet server returns + // the list of extents it was hosting but was unable to refresh (the tablets could be in + // the process of loading). If it is not currently hosting the tablet it treats that as + // refreshed and does not return anything for it. + Future<List<TKeyExtent>> future = threadPool + .submit(() -> sendSyncRefreshRequest(context, logId, entry.getKey(), entry.getValue())); + + futures.put(entry.getKey(), future); + } + + for (Map.Entry<TabletMetadata.Location,Future<List<TKeyExtent>>> entry : futures.entrySet()) { + TabletMetadata.Location location = entry.getKey(); + Future<List<TKeyExtent>> future = entry.getValue(); + + List<TKeyExtent> nonRefreshedExtents = null; + try { + nonRefreshedExtents = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + if (nonRefreshedExtents.isEmpty()) { + // tablet server was able to refresh everything, so remove that location + refreshesNeeded.remove(location); + } else { + // tablet server could not refresh some tablets, try them again later. + refreshesNeeded.put(location, nonRefreshedExtents); + } + } + + // look for any tservers that have died since we read the metadata table and remove them + if (!refreshesNeeded.isEmpty()) { + Set<TServerInstance> liveTservers = onlineTserversSupplier.get(); + + refreshesNeeded.keySet() + .removeIf(location -> !liveTservers.contains(location.getServerInstance())); + } + + if (!refreshesNeeded.isEmpty()) { + try { + retry.waitForNextAttempt(log, logId + " waiting for " + refreshesNeeded.size() + + " tservers to refresh their tablets metadata"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + private static List<TKeyExtent> sendSyncRefreshRequest(ServerContext context, String logId, + TabletMetadata.Location location, List<TKeyExtent> refreshes) { + TabletServerClientService.Client client = null; + try { + log.trace("{} sending refresh request to {} for {} extents", logId, location, + refreshes.size()); + var timeInMillis = context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, location.getHostAndPort(), + context, timeInMillis); + + var unrefreshed = client.refreshTablets(TraceUtil.traceInfo(), context.rpcCreds(), refreshes); + + log.trace("{} refresh request to {} returned {} unrefreshed extents", logId, location, + unrefreshed.size()); + + return unrefreshed; + } catch (TException ex) { + log.debug("rpc failed server: " + location + ", " + logId + " " + ex.getMessage(), ex); + + // ELASTICITY_TODO are there any other exceptions we should catch in this method and check if + // the tserver is till alive? + + // something went wrong w/ RPC return all extents as unrefreshed + return refreshes; + } finally { + ThriftUtil.returnClient(client, context); + } + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index a9ed1fd6be,84e69d561f..22fe50c0c9 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -18,7 -18,7 +18,8 @@@ */ package org.apache.accumulo.manager.upgrade; + import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index d8ea4d78c5,de47b841fd..0a86efdf7a --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -302,8 -307,17 +305,9 @@@ public class TabletServerResourceManage () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), "minor compactor", minorCompactionThreadPool); - defaultMigrationPool = ThreadPools.getServerThreadPools().createThreadPool(0, 1, 60, - TimeUnit.SECONDS, "metadata tablet migration", true); - splitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0) - .numMaxThreads(1).withTimeOut(1, SECONDS).enableThreadPoolMetrics().build(); - - defaultSplitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder("md splitter").numCoreThreads(0) - .numMaxThreads(1).withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); - - defaultMigrationPool = ThreadPools.getServerThreadPools() - .getPoolBuilder("metadata tablet migration").numCoreThreads(0).numMaxThreads(1) - .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); ++ defaultMigrationPool = ++ ThreadPools.getServerThreadPools().getPoolBuilder("metadata tablet migration") ++ .numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, SECONDS).build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, true); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index dda829d0a8,2b5a6c6135..030720c93d --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -291,30 -290,13 +291,31 @@@ public class LogSorter } } - public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { - int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); + /** + * Sort any logs that need sorting in the current thread. + * + * @return The time in millis when the next check can be done. + */ + public long sortLogsIfNeeded() throws KeeperException, InterruptedException { + DistributedWorkQueue dwq = new DistributedWorkQueue( + context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context); + dwq.processExistingWork(new LogProcessor(), MoreExecutors.newDirectExecutorService(), 1, false); + return System.currentTimeMillis() + dwq.getCheckInterval(); + } + + /** + * Sort any logs that need sorting in a ThreadPool using + * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will start a background + * thread to look for log sorting work in the future that will be processed by the + * ThreadPoolExecutor + */ + public void startWatchingForRecoveryLogs(int threadPoolSize) + throws KeeperException, InterruptedException { - ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools() - .createFixedThreadPool(threadPoolSize, this.getClass().getName(), true); + ThreadPoolExecutor threadPool = + ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, - context).startProcessing(new LogProcessor(), threadPool); + context).processExistingAndFuture(new LogProcessor(), threadPool); } public List<RecoveryStatus> getLogSorts() {