This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 7fb1b5d Check scheduled tasks periodically for errors (#2524) 7fb1b5d is described below commit 7fb1b5df9e3abe18304039fb3e781b2617eec4b5 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Mar 4 07:59:03 2022 -0500 Check scheduled tasks periodically for errors (#2524) Updated code that use ScheduleExecutorServices and were not retaining a reference to the ScheduleFuture object returned by the methods that create scheduled tasks. If the code was adding a non-scheduled task via the `submit` method, I changed the code to use the `execute` method. For code that was adding a scheduled task, but not doing anything with the ScheduledFuture, I modified the code to pass the ScheduledFuture reference to a utility method in ThreadPools. The new ThreadPools ut [...] --- .../DefaultContextClassLoaderFactory.java | 4 +- .../core/clientImpl/ConditionalWriterImpl.java | 7 +- .../core/clientImpl/TabletServerBatchWriter.java | 36 +++--- .../file/blockfile/cache/lru/LruBlockCache.java | 6 +- .../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 5 +- .../org/apache/accumulo/core/summary/Gatherer.java | 10 +- .../util/ratelimit/SharedRateLimiterFactory.java | 10 +- .../accumulo/core/util/threads/ThreadPools.java | 124 +++++++++++++++++++++ .../main/java/org/apache/accumulo/fate/Fate.java | 4 +- .../core/file/rfile/MultiThreadedRFileTest.java | 2 +- pom.xml | 3 +- .../org/apache/accumulo/server/ServerContext.java | 4 +- .../accumulo/server/client/BulkImporter.java | 6 +- .../server/compaction/CompactionWatcher.java | 5 +- .../org/apache/accumulo/server/fs/FileManager.java | 6 +- .../accumulo/server/manager/LiveTServerSet.java | 6 +- .../apache/accumulo/server/rpc/TServerUtils.java | 30 ++--- .../accumulo/server/util/FileSystemMonitor.java | 26 ++--- .../server/util/RemoveEntriesForMissingFiles.java | 2 +- .../server/zookeeper/DistributedWorkQueue.java | 29 ++--- .../coordinator/CompactionCoordinator.java | 11 +- .../accumulo/coordinator/CompactionFinalizer.java | 4 +- .../coordinator/DeadCompactionDetector.java | 5 +- .../org/apache/accumulo/compactor/Compactor.java | 11 +- .../java/org/apache/accumulo/manager/Manager.java | 13 ++- .../org/apache/accumulo/manager/ManagerTime.java | 6 +- .../manager/metrics/ReplicationMetrics.java | 6 +- .../accumulo/manager/metrics/fate/FateMetrics.java | 5 +- .../accumulo/manager/recovery/RecoveryManager.java | 9 +- .../util/logging/AccumuloMonitorAppender.java | 6 +- .../apache/accumulo/tserver/AssignmentHandler.java | 33 +++--- .../org/apache/accumulo/tserver/TabletServer.java | 77 +++++++------ .../tserver/TabletServerResourceManager.java | 11 +- .../accumulo/tserver/log/TabletServerLogger.java | 2 +- .../metrics/CompactionExecutorsMetrics.java | 7 +- .../accumulo/tserver/session/SessionManager.java | 10 +- .../apache/accumulo/test/ConditionalWriterIT.java | 2 +- .../compaction/ExternalDoNothingCompactor.java | 5 +- .../apache/accumulo/test/functional/ScanIdIT.java | 2 +- .../accumulo/test/manager/SuspendedTabletsIT.java | 2 +- .../test/performance/scan/CollectTabletStats.java | 4 +- 41 files changed, 383 insertions(+), 173 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java index e19c38b..6baaa64 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -62,7 +63,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto private static void startCleanupThread(final AccumuloConfiguration conf, final Supplier<Map<String,String>> contextConfigSupplier) { - ThreadPools.createGeneralScheduledExecutorService(conf) + ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(conf) .scheduleWithFixedDelay(Threads.createNamedRunnable(className + "-cleanup", () -> { LOG.trace("{}-cleanup thread, properties: {}", className, conf); Set<String> contextsInUse = contextConfigSupplier.get().keySet().stream() @@ -71,6 +72,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto LOG.trace("{}-cleanup thread, contexts in use: {}", className, contextsInUse); AccumuloVFSClassLoader.removeUnusedContexts(contextsInUse); }), 1, 1, MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); LOG.debug("Context cleanup timer started at 60s intervals"); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index ff0d596..d566bae 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -113,6 +114,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private Map<String,ServerQueue> serverQueues; private DelayQueue<QCMutation> failedMutations = new DelayQueue<>(); private ScheduledThreadPoolExecutor threadPool; + private final ScheduledFuture<?> failureTaskFuture; private class RQIterator implements Iterator<Result> { @@ -380,12 +382,15 @@ class ConditionalWriterImpl implements ConditionalWriter { queue(mutations); }; - threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS); + failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS); } @Override public Iterator<Result> write(Iterator<ConditionalMutation> mutations) { + ThreadPools.ensureRunning(failureTaskFuture, + "Background task that re-queues failed mutations has exited."); + BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); List<QCMutation> mutationList = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 9047622..b2374bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -126,6 +127,7 @@ public class TabletServerBatchWriter implements AutoCloseable { // latency timers private final ScheduledThreadPoolExecutor executor; + private ScheduledFuture<?> latencyTimerFuture; private final Map<String,TimeoutTracker> timeoutTrackers = Collections.synchronizedMap(new HashMap<>()); @@ -214,17 +216,18 @@ public class TabletServerBatchWriter implements AutoCloseable { this.writer = new MutationWriter(config.getMaxWriteThreads()); if (this.maxLatency != Long.MAX_VALUE) { - executor.scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> { - try { - synchronized (TabletServerBatchWriter.this) { - if ((System.currentTimeMillis() - lastProcessingStartTime) - > TabletServerBatchWriter.this.maxLatency) - startProcessing(); - } - } catch (Exception e) { - updateUnknownErrors("Max latency task failed " + e.getMessage(), e); - } - }), 0, this.maxLatency / 4, MILLISECONDS); + latencyTimerFuture = executor + .scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> { + try { + synchronized (TabletServerBatchWriter.this) { + if ((System.currentTimeMillis() - lastProcessingStartTime) + > TabletServerBatchWriter.this.maxLatency) + startProcessing(); + } + } catch (Exception e) { + updateUnknownErrors("Max latency task failed " + e.getMessage(), e); + } + }), 0, this.maxLatency / 4, MILLISECONDS); } } @@ -248,6 +251,10 @@ public class TabletServerBatchWriter implements AutoCloseable { throw new IllegalStateException("Closed"); if (m.size() == 0) throw new IllegalArgumentException("Can not add empty mutations"); + if (this.latencyTimerFuture != null) { + ThreadPools.ensureRunning(this.latencyTimerFuture, + "Latency timer thread has exited, cannot guarantee latency target"); + } checkForFailures(); @@ -576,14 +583,17 @@ public class TabletServerBatchWriter implements AutoCloseable { private MutationSet recentFailures = null; private long initTime; private final Runnable task; + private final ScheduledFuture<?> future; FailedMutations() { task = Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run); - executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS); + future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS); } private MutationSet init() { + ThreadPools.ensureRunning(future, + "Background task that re-queues failed mutations has exited."); if (recentFailures == null) { recentFailures = new MutationSet(); initTime = System.currentTimeMillis(); @@ -775,7 +785,7 @@ public class TabletServerBatchWriter implements AutoCloseable { for (String server : servers) if (!queued.contains(server)) { - sendThreadPool.submit(new SendTask(server)); + sendThreadPool.execute(new SendTask(server)); queued.add(server); } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java index b66fa36..ad426b1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -160,8 +161,9 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block } else { this.evictionThread = null; } - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, - statThreadPeriod, SECONDS); + ScheduledFuture<?> future = this.scheduleThreadPool.scheduleAtFixedRate( + new StatisticsThread(this), statThreadPeriod, statThreadPeriod, SECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } public long getOverhead() { diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java index 6553319..9819923 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.function.Supplier; import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize; @@ -71,7 +72,9 @@ public final class TinyLfuBlockCache implements BlockCache { }).maximumWeight(conf.getMaxSize(type)).recordStats().build(); policy = cache.policy().eviction().get(); maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum()); - statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, SECONDS); + ScheduledFuture<?> future = statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, + STATS_PERIOD_SEC, SECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index 17b7a52..6a0938f 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@ -396,7 +396,10 @@ public class Gatherer { // when all processing is done, check for failed files... and if found starting processing // again - future.thenRun(this::updateFuture); + @SuppressWarnings("unused") + CompletableFuture<Void> unused = future.thenRun(() -> { + CompletableFuture<ProcessedFiles> unused2 = this.updateFuture(); + }); } catch (Exception e) { future = CompletableFuture.completedFuture(new ProcessedFiles()); // force future to have this exception @@ -449,7 +452,8 @@ public class Gatherer { @Override public synchronized boolean isDone() { - updateFuture(); + @SuppressWarnings("unused") + CompletableFuture<ProcessedFiles> unused = updateFuture(); if (future.isDone()) { if (future.isCancelled() || future.isCompletedExceptionally()) { return true; @@ -459,7 +463,7 @@ public class Gatherer { if (pf.failedFiles.isEmpty()) { return true; } else { - updateFuture(); + unused = updateFuture(); } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java index f31d26b..ae61337 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java @@ -25,6 +25,7 @@ import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -43,6 +44,7 @@ public class SharedRateLimiterFactory { private static final long REPORT_RATE = 60000; private static final long UPDATE_RATE = 1000; private static SharedRateLimiterFactory instance = null; + private static ScheduledFuture<?> updateTaskFuture; private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class); private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters = new WeakHashMap<>(); @@ -55,13 +57,14 @@ public class SharedRateLimiterFactory { instance = new SharedRateLimiterFactory(); ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf); - svc.scheduleWithFixedDelay(Threads + updateTaskFuture = svc.scheduleWithFixedDelay(Threads .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll), UPDATE_RATE, UPDATE_RATE, MILLISECONDS); - svc.scheduleWithFixedDelay(Threads + ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll), REPORT_RATE, REPORT_RATE, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } return instance; @@ -91,6 +94,9 @@ public class SharedRateLimiterFactory { */ public RateLimiter create(String name, RateProvider rateProvider) { synchronized (activeLimiters) { + if (updateTaskFuture.isDone()) { + log.warn("SharedRateLimiterFactory update task has failed."); + } var limiterRef = activeLimiters.get(name); var limiter = limiterRef == null ? null : limiterRef.get(); if (limiter == null) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 67d08ba..afa05c0 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -21,9 +21,14 @@ package org.apache.accumulo.core.util.threads; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import java.util.Iterator; +import java.util.List; import java.util.OptionalInt; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; @@ -40,13 +45,132 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN", + justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler") public class ThreadPools { + public static class ExecutionError extends Error { + + private static final long serialVersionUID = 1L; + + public ExecutionError(String message, Throwable cause) { + super(message, cause); + } + } + private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); // the number of seconds before we allow a thread to terminate with non-use. public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L; + private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = + createFixedThreadPool(1, "Scheduled Future Checker", false); + + private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS = + new ConcurrentLinkedQueue<>(); + + private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS = + new ConcurrentLinkedQueue<>(); + + private static Runnable TASK_CHECKER = new Runnable() { + @Override + public void run() { + final List<ConcurrentLinkedQueue<ScheduledFuture<?>>> queues = + List.of(CRITICAL_RUNNING_TASKS, NON_CRITICAL_RUNNING_TASKS); + while (true) { + queues.forEach(q -> { + Iterator<ScheduledFuture<?>> tasks = q.iterator(); + while (tasks.hasNext()) { + if (checkTaskFailed(tasks.next(), q)) { + tasks.remove(); + } + } + }); + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException ie) { + // This thread was interrupted by something while sleeping. We don't want to exit + // this thread, so reset the interrupt state on this thread and keep going. + Thread.interrupted(); + } + } + } + }; + + /** + * Checks to see if a ScheduledFuture has exited successfully or thrown an error + * + * @param future + * scheduled future to check + * @param taskQueue + * the running task queue from which the future came + * @return true if the future should be removed + */ + private static boolean checkTaskFailed(ScheduledFuture<?> future, + ConcurrentLinkedQueue<ScheduledFuture<?>> taskQueue) { + // Calling get() on a ScheduledFuture will block unless that scheduled task has + // completed. We call isDone() here instead. If the scheduled task is done then + // either it was a one-shot task, cancelled or an exception was thrown. + if (future.isDone()) { + // Now call get() to see if we get an exception. + try { + future.get(); + // If we get here, then a scheduled task exited but did not throw an error + // or get canceled. This was likely a one-shot scheduled task (I don't think + // we can tell if it's one-shot or not, I think we have to assume that it is + // and that a recurring task would not normally be complete). + return true; + } catch (ExecutionException ee) { + // An exception was thrown in the critical task. Throw the error here, which + // will then be caught by the AccumuloUncaughtExceptionHandler which will + // log the error and terminate the VM. + if (taskQueue == CRITICAL_RUNNING_TASKS) { + throw new ExecutionError("Critical scheduled background task failed.", ee); + } else { + LOG.error("Non-critical scheduled background task failed", ee); + return true; + } + } catch (CancellationException ce) { + // do nothing here as it appears that the task was canceled. Remove it from + // the list of critical tasks + return true; + } catch (InterruptedException ie) { + // current thread was interrupted waiting for get to return, which in theory, + // shouldn't happen since the task is done. + LOG.info("Interrupted while waiting to check on scheduled background task."); + // Reset the interrupt state on this thread + Thread.interrupted(); + } + } + return false; + } + + static { + SCHEDULED_FUTURE_CHECKER_POOL.execute(TASK_CHECKER); + } + + public static void watchCriticalScheduledTask(ScheduledFuture<?> future) { + CRITICAL_RUNNING_TASKS.add(future); + } + + public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) { + NON_CRITICAL_RUNNING_TASKS.add(future); + } + + public static void ensureRunning(ScheduledFuture<?> future, String message) { + if (future.isDone()) { + try { + future.get(); + } catch (Exception e) { + throw new IllegalStateException(message, e); + } + // it exited w/o exception, but we still expect it to be running so throw an exception. + throw new IllegalStateException(message); + } + } + /** * Resize ThreadPoolExecutor based on current value of maxThreads * diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java index 0ca8a0a..d467dc3 100644 --- a/core/src/main/java/org/apache/accumulo/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java @@ -240,7 +240,7 @@ public class Fate<T> { final ThreadPoolExecutor pool = ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf); - fatePoolWatcher.schedule(() -> { + ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> { // resize the pool if the property changed ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); // If the pool grew, then ensure that there is a TransactionRunner for each thread @@ -262,7 +262,7 @@ public class Fate<T> { } } } - }, 3, SECONDS); + }, 3, SECONDS)); executor = pool; } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 0358c66..9ce5649 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -248,7 +248,7 @@ public class MultiThreadedRFileTest { } }; for (int i = 0; i < maxThreads; i++) { - pool.submit(runnable); + pool.execute(runnable); } } finally { pool.shutdown(); diff --git a/pom.xml b/pom.xml index 363e64e..a638932 100644 --- a/pom.xml +++ b/pom.xml @@ -1674,7 +1674,7 @@ <arg>-XDcompilePolicy=simple</arg> <arg> -Xplugin:ErrorProne \ - -XepExcludedPaths:.*/(proto|thrift|generated-sources)/.* \ + -XepExcludedPaths:.*/(proto|thrift|generated-sources|src/test)/.* \ -XepDisableWarningsInGeneratedCode \ -XepDisableAllWarnings \ <!-- error/warning patterns to ignore --> @@ -1682,6 +1682,7 @@ -Xep:CheckReturnValue:OFF \ -Xep:MustBeClosedChecker:OFF \ -Xep:ReturnValueIgnored:OFF \ + -Xep:FutureReturnValueIgnored:ERROR \ -Xep:UnicodeInCode:OFF \ <!-- error/warning patterns to specifically check --> -Xep:ExpectedExceptionChecker \ diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index aa59057..5d2bafd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -423,7 +424,7 @@ public class ServerContext extends ClientContext { } private void monitorSwappiness() { - getScheduledExecutor().scheduleWithFixedDelay(() -> { + ScheduledFuture<?> future = getScheduledExecutor().scheduleWithFixedDelay(() -> { try { String procFile = "/proc/sys/vm/swappiness"; File swappiness = new File(procFile); @@ -445,6 +446,7 @@ public class ServerContext extends ClientContext { log.error("", t); } }, SECONDS.toMillis(1), MINUTES.toMillis(10), TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index a290c85..d68c5ea 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -153,7 +153,7 @@ public class BulkImporter { assignments.put(mapFile, tabletsToAssignMapFileTo); } }; - threadPool.submit(getAssignments); + threadPool.execute(getAssignments); } threadPool.shutdown(); while (!threadPool.isTerminated()) { @@ -396,7 +396,7 @@ public class BulkImporter { } }; - threadPool.submit(estimationTask); + threadPool.execute(estimationTask); } threadPool.shutdown(); @@ -541,7 +541,7 @@ public class BulkImporter { for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer .entrySet()) { String location = entry.getKey(); - threadPool.submit(new AssignmentTask(assignmentFailures, location, entry.getValue())); + threadPool.execute(new AssignmentTask(assignmentFailures, location, entry.getValue())); } threadPool.shutdown(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java index ba5e5fc..d5c987a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.slf4j.LoggerFactory; @@ -122,8 +123,8 @@ public class CompactionWatcher implements Runnable { public static synchronized void startWatching(ServerContext context) { if (!watching) { - context.getScheduledExecutor().scheduleWithFixedDelay( - new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS)); watching = true; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index 4763f59..cd276e5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReportingIterator; @@ -163,8 +164,9 @@ public class FileManager { this.reservedReaders = new HashMap<>(); this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE); - this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime, - maxIdleTime / 2, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask( + this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), + maxIdleTime, maxIdleTime / 2, TimeUnit.MILLISECONDS)); this.slowFilePermitMillis = this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index a88af20..fbfec53 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.zookeeper.ServiceLock; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; @@ -254,8 +255,9 @@ public class LiveTServerSet implements Watcher { public synchronized void startListeningForTabletServerChanges() { scanServers(); - this.context.getScheduledExecutor().scheduleWithFixedDelay(this::scanServers, 0, 5000, - TimeUnit.MILLISECONDS); + + ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor() + .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS)); } public synchronized void scanServers() { diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index eeccf7e..80e48cf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -314,20 +314,22 @@ public class TServerUtils { final ThreadPoolExecutor pool = ThreadPools.createFixedThreadPool(executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true); // periodically adjust the number of threads we need by checking how busy our threads are - ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> { - // there is a minor race condition between sampling the current state of the thread pool and - // adjusting it - // however, this isn't really an issue, since it adjusts periodically anyway - if (pool.getCorePoolSize() <= pool.getActiveCount()) { - int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); - ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); - } else { - if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { - int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); - ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); - } - } - }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask( + ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> { + // there is a minor race condition between sampling the current state of the thread pool + // and + // adjusting it + // however, this isn't really an issue, since it adjusts periodically anyway + if (pool.getCorePoolSize() <= pool.getActiveCount()) { + int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); + ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); + } else { + if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { + int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); + ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); + } + } + }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS)); return pool; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java index 39ce84a..11d19cd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java @@ -117,20 +117,20 @@ public class FileSystemMonitor { // Create a task to check each mount periodically to see if its state has changed. for (Mount mount : mounts) { - ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay( - Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> { - try { - checkMount(mount); - } catch (final Exception e) { - Halt.halt(-42, new Runnable() { - @Override - public void run() { - log.error("Exception while checking mount points, halting process", e); + ThreadPools.watchCriticalScheduledTask( + ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay( + Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> { + try { + checkMount(mount); + } catch (final Exception e) { + Halt.halt(-42, new Runnable() { + @Override + public void run() { + log.error("Exception while checking mount points, halting process", e); + } + }); } - }); - } - }), period, period, TimeUnit.MILLISECONDS); - + }), period, period, TimeUnit.MILLISECONDS)); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java index 14914f4..ba22df7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java @@ -156,7 +156,7 @@ public class RemoveEntriesForMissingFiles { processing.add(map); } - threadPool.submit( + threadPool.execute( new CheckFileTask(cache, fs, missing, writer, key, map, processing, exceptionRef)); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java index 3a424dc..cbbf81b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -217,19 +218,20 @@ public class DistributedWorkQueue { lookForWork(processor, children); // Add a little jitter to avoid all the tservers slamming zookeeper at once - context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - log.debug("Looking for work in {}", path); - try { - lookForWork(processor, zoo.getChildren(path)); - } catch (KeeperException e) { - log.error("Failed to look for work", e); - } catch (InterruptedException e) { - log.info("Interrupted looking for work", e); - } - } - }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask( + context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + log.debug("Looking for work in {}", path); + try { + lookForWork(processor, zoo.getChildren(path)); + } catch (KeeperException e) { + log.error("Failed to look for work", e); + } catch (InterruptedException e) { + log.info("Interrupted looking for work", e); + } + } + }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS)); } /** @@ -240,6 +242,7 @@ public class DistributedWorkQueue { } public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException { + if (workId.equalsIgnoreCase(LOCKS_NODE)) throw new IllegalArgumentException("locks is reserved work id"); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index d534062..77a5c3b 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -162,12 +163,16 @@ public class CompactionCoordinator extends AbstractServer } protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { - schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { - schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES); + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); } protected void printStartupMsg() { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 424deb7..196ac2a 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -83,8 +83,8 @@ public class CompactionFinalizer { processPending(); }); - schedExecutor.scheduleWithFixedDelay(() -> notifyTservers(), 0, tserverCheckInterval, - TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( + () -> notifyTservers(), 0, tserverCheckInterval, TimeUnit.MILLISECONDS)); } public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index e4ee3bd..5a9e526 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,12 +139,12 @@ public class DeadCompactionDetector { long interval = this.context.getConfiguration() .getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL); - schedExecutor.scheduleWithFixedDelay(() -> { + ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(() -> { try { detectDeadCompactions(); } catch (RuntimeException e) { log.warn("Failed to look for dead compactions", e); } - }, 0, interval, TimeUnit.MILLISECONDS); + }, 0, interval, TimeUnit.MILLISECONDS)); } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 14b0545..d1cc457 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -196,14 +197,16 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { - schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, long timeBetweenChecks) { - schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks, - TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( + () -> checkIfCanceled(), 0, timeBetweenChecks, TimeUnit.MILLISECONDS)); } protected void checkIfCanceled() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 7071569..7f360de 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -252,11 +253,12 @@ public class Manager extends AbstractServer if (newState == ManagerState.STOP) { // Give the server a little time before shutdown so the client // thread requesting the stop can return - getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + ScheduledFuture<?> future = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { // This frees the main thread and will cause the manager to exit clientService.stop(); Manager.this.nextEvent.event("stopped event loop"); }, 100L, 1000L, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) { @@ -930,7 +932,7 @@ public class Manager extends AbstractServer // unresponsive tservers. sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS); } - tp.submit(() -> { + tp.execute(() -> { try { Thread t = Thread.currentThread(); String oldName = t.getName(); @@ -1150,8 +1152,8 @@ public class Manager extends AbstractServer fate = new Fate<>(this, store, TraceRepo::toLogString); fate.startTransactionRunners(getConfiguration()); - context.getScheduledExecutor().scheduleWithFixedDelay(store::ageOff, 63000, 63000, - TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() + .scheduleWithFixedDelay(store::ageOff, 63000, 63000, TimeUnit.MILLISECONDS)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } @@ -1198,7 +1200,7 @@ public class Manager extends AbstractServer // if the replication name is ever set, then start replication services final AtomicReference<TServer> replServer = new AtomicReference<>(); - context.getScheduledExecutor().scheduleWithFixedDelay(() -> { + ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> { try { @SuppressWarnings("deprecation") Property p = Property.REPLICATION_NAME; @@ -1210,6 +1212,7 @@ public class Manager extends AbstractServer log.error("Error occurred starting replication services. ", e); } }, 0, 5000, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); // checking stored user hashes if any of them uses an outdated algorithm security.validateStoredUserCreditentials(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java index 41077bd..237a207 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java @@ -64,9 +64,9 @@ public class ManagerTime { throw new IOException("Error updating manager time", ex); } - ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay( - Threads.createNamedRunnable("Manager time keeper", () -> run()), 0, SECONDS.toMillis(10), - MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(ThreadPools.createGeneralScheduledExecutorService(conf) + .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time keeper", () -> run()), 0, + SECONDS.toMillis(10), MILLISECONDS)); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java index 79eb5ac..c90337b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -165,8 +166,9 @@ public class ReplicationMetrics implements MetricsProducer { ThreadPools.createScheduledExecutorService(1, "replicationMetricsPoller", false); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5); - scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay, - TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, + minimumRefreshDelay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 055092f..42e18db 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.metrics.fate; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -155,14 +156,14 @@ public class FateMetrics implements MetricsProducer { ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); - scheduler.scheduleAtFixedRate(() -> { + ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> { try { update(); } catch (Exception ex) { log.info("Failed to update fate metrics due to exception", ex); } }, refreshDelay, refreshDelay, TimeUnit.MILLISECONDS); - + ThreadPools.watchNonCriticalScheduledTask(future); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 0503963..a1ba264 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; @@ -105,7 +106,8 @@ public class RecoveryManager { manager.getVolumeManager(), new Path(source)); if (time > 0) { - executor.schedule(this, time, TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = executor.schedule(this, time, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); rescheduled = true; } else { initiateSort(sortId, source, destination); @@ -210,8 +212,9 @@ public class RecoveryManager { log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, (delay / 1000), extent); - executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, - TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = executor.schedule( + new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); closeTasksQueued.add(sortId); recoveryDelay.put(sortId, delay); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java index 368e2a6..f62bb8b 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java @@ -26,8 +26,10 @@ import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.accumulo.core.Constants; @@ -122,7 +124,9 @@ public class AccumuloMonitorAppender extends AbstractAppender { var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8)) .setHeader("Content-Type", "application/json").build(); - httpClient.sendAsync(req, BodyHandlers.discarding()); + @SuppressWarnings("unused") + CompletableFuture<HttpResponse<Void>> future = + httpClient.sendAsync(req, BodyHandlers.discarding()); } catch (final Exception e) { error("Unable to send HTTP in appender [" + getName() + "]", event, e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index df7f5fd..ece1d9c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.manager.thrift.TabletLoadState; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.TabletStateStore; @@ -220,22 +221,24 @@ class AssignmentHandler implements Runnable { server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent)); long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, MINUTES.toMillis(10)); log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.)); - this.server.getContext().getScheduledExecutor().schedule(new Runnable() { - @Override - public void run() { - log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt); - AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1); - if (extent.isMeta()) { - if (extent.isRootTablet()) { - Threads.createThread("Root tablet assignment retry", handler).start(); - } else { - server.resourceManager.addMetaDataAssignment(extent, log, handler); + ThreadPools.watchCriticalScheduledTask( + this.server.getContext().getScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + log.info("adding tablet {} back to the assignment pool (retry {})", extent, + retryAttempt); + AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1); + if (extent.isMeta()) { + if (extent.isRootTablet()) { + Threads.createThread("Root tablet assignment retry", handler).start(); + } else { + server.resourceManager.addMetaDataAssignment(extent, log, handler); + } + } else { + server.resourceManager.addAssignment(extent, log, handler); + } } - } else { - server.resourceManager.addAssignment(extent, log, handler); - } - } - }, reschedule, TimeUnit.MILLISECONDS); + }, reschedule, TimeUnit.MILLISECONDS)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index c4146c1..c53bb91 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -50,6 +50,7 @@ import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -260,7 +261,7 @@ public class TabletServer extends AbstractServer { // This thread will calculate and log out the busiest tablets based on ingest count and // query count every #{logBusiestTabletsDelay} if (numBusyTabletsToLog > 0) { - context.getScheduledExecutor() + ScheduledFuture<?> future = context.getScheduledExecutor() .scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() { private BusiestTracker ingestTracker = BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog); @@ -285,9 +286,10 @@ public class TabletServer extends AbstractServer { } } }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } - context.getScheduledExecutor() + ScheduledFuture<?> future = context.getScheduledExecutor() .scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() { @Override public void run() { @@ -301,6 +303,7 @@ public class TabletServer extends AbstractServer { } } }), 5, 5, TimeUnit.SECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); @SuppressWarnings("deprecation") final long walMaxSize = @@ -348,8 +351,8 @@ public class TabletServer extends AbstractServer { this.resourceManager = new TabletServerResourceManager(context); this.security = AuditedSecurityOperation.getInstance(context); - context.getScheduledExecutor().scheduleWithFixedDelay(TabletLocator::clearLocators, jitter(), - jitter(), TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS)); walMarker = new WalStateManager(context); // Create the secret manager @@ -791,7 +794,7 @@ public class TabletServer extends AbstractServer { // if the replication name is ever set, then start replication services @SuppressWarnings("deprecation") Property p = Property.REPLICATION_NAME; - context.getScheduledExecutor().scheduleWithFixedDelay(() -> { + ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> { if (this.replServer == null) { if (!getConfiguration().get(p).isEmpty()) { log.info(p.getKey() + " was set, starting repl services."); @@ -799,36 +802,40 @@ public class TabletServer extends AbstractServer { } } }, 0, 5, TimeUnit.SECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay // Periodically check that metadata of tablets matches what is held in memory - ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> { - final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); - - Map<KeyExtent,Long> updateCounts = new HashMap<>(); - - // gather updateCounts for each tablet - onlineTabletsSnapshot.forEach((ke, tablet) -> { - updateCounts.put(ke, tablet.getUpdateCount()); - }); - - // gather metadata for all tablets readTablets() - try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets() - .forTablets(onlineTabletsSnapshot.keySet()).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { - - // for each tablet, compare its metadata to what is held in memory - tabletsMetadata.forEach(tabletMetadata -> { - KeyExtent extent = tabletMetadata.getExtent(); - Tablet tablet = onlineTabletsSnapshot.get(extent); - Long counter = updateCounts.get(extent); - tablet.compareTabletInfo(counter, tabletMetadata); - }); - } - }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES); + ThreadPools.watchCriticalScheduledTask( + ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> { + final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); + + Map<KeyExtent,Long> updateCounts = new HashMap<>(); + + // gather updateCounts for each tablet + onlineTabletsSnapshot.forEach((ke, tablet) -> { + updateCounts.put(ke, tablet.getUpdateCount()); + }); + + // gather metadata for all tablets readTablets() + try (TabletsMetadata tabletsMetadata = + getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet()) + .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { + + // for each tablet, compare its metadata to what is held in memory + tabletsMetadata.forEach(tabletMetadata -> { + KeyExtent extent = tabletMetadata.getExtent(); + Tablet tablet = onlineTabletsSnapshot.get(extent); + Long counter = updateCounts.get(extent); + tablet.compareTabletInfo(counter, tabletMetadata); + }); + } + }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES)); final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15); - context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this), - CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, + CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS)); HostAndPort managerHost; while (!serverStopRequested) { @@ -953,8 +960,9 @@ public class TabletServer extends AbstractServer { Runnable replicationWorkThreadPoolResizer = () -> { ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS); }; - context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, - TimeUnit.SECONDS); + ScheduledFuture<?> future = context.getScheduledExecutor() + .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } public String getClientAddressString() { @@ -1019,8 +1027,9 @@ public class TabletServer extends AbstractServer { Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration()); - context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS, - TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, + 0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 190b022..23ed47f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -137,8 +137,8 @@ public class TabletServerResourceManager { */ private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) { - context.getScheduledExecutor().scheduleWithFixedDelay( - () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS)); } private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, @@ -378,8 +378,8 @@ public class TabletServerResourceManager { // We can use the same map for both metadata and normal assignments since the keyspace (extent) // is guaranteed to be unique. Schedule the task once, the task will reschedule itself. - context.getScheduledExecutor().schedule( - new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule( + new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS)); } /** @@ -439,7 +439,8 @@ public class TabletServerResourceManager { if (log.isTraceEnabled()) { log.trace("Rescheduling assignment watcher to run in {}ms", delay); } - context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask( + context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS)); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index b884e61..92089a0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -263,7 +263,7 @@ public class TabletServerLogger { return; } nextLogMaker = ThreadPools.createFixedThreadPool(1, "WALog creator", true); - nextLogMaker.submit(new Runnable() { + nextLogMaker.execute(new Runnable() { @Override public void run() { final ServerResources conf = tserver.getServerConfig(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java index e5d79ad..955d3a2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; @@ -68,9 +69,9 @@ public class CompactionExecutorsMetrics implements MetricsProducer { ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5); - scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay, - TimeUnit.MILLISECONDS); - + ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, + minimumRefreshDelay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } public synchronized AutoCloseable addExecutor(CompactionExecutorId ceid, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index ec10d73..ae43495 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -72,8 +73,8 @@ public class SessionManager { Runnable r = () -> sweep(maxIdle, maxUpdateIdle); - context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000), - TimeUnit.MILLISECONDS); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(r, + 0, Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS)); } public long createSession(Session session, boolean reserve) { @@ -278,8 +279,9 @@ public class SessionManager { } }; - ThreadPools.createGeneralScheduledExecutorService(aconf).schedule(r, delay, - TimeUnit.MILLISECONDS); + ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(aconf) + .schedule(r, delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); } } diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java index 48ee3c1..86a2b08 100644 --- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -1247,7 +1247,7 @@ public class ConditionalWriterIT extends SharedMiniClusterBase { ExecutorService tp = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { - tp.submit(new MutatorTask(tableName, client, rows, cw, failed)); + tp.execute(new MutatorTask(tableName, client, rows, cw, failed)); } tp.shutdown(); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 87b5c9f..a6cbc6f 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.compaction; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -45,7 +46,9 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { @Override protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, long timeBetweenChecks) { - schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000, TimeUnit.MILLISECONDS); + @SuppressWarnings("unused") + ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, + 5000, TimeUnit.MILLISECONDS); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index bebd7ec..0e74000 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@ -128,7 +128,7 @@ public class ScanIdIT extends AccumuloClusterHarness { for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) { ScannerThread st = new ScannerThread(client, scannerIndex, tableName, latch); scanThreadsToClose.add(st); - pool.submit(st); + pool.execute(st); } // wait for scanners to report a result. diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index 4d17c4c..3905872 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -410,7 +410,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { answer.scan(ctx, tableName, metaName); return answer; }); - THREAD_POOL.submit(tlsFuture); + THREAD_POOL.execute(tlsFuture); return tlsFuture.get(5, SECONDS); } catch (TimeoutException ex) { log.debug("Retrieval timed out", ex); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index 4f93d3f..1358ab7 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -233,7 +233,7 @@ public class CollectTabletStats { } for (final KeyExtent ke : tabletsToTest) { - threadPool.submit(() -> { + threadPool.execute(() -> { try { calcTabletStats(client, opts.tableName, opts.auths, ke, columns); } catch (Exception e) { @@ -318,7 +318,7 @@ public class CollectTabletStats { CountDownLatch finishedSignal = new CountDownLatch(numThreads); for (Test test : tests) { - threadPool.submit(test); + threadPool.execute(test); test.setSignals(startSignal, finishedSignal); }