This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch deal-with-unchecked-futures in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 12abc74a961e6ab91d9b4da24a3faa0f55094301 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Feb 24 15:10:13 2022 +0000 Beginning to address unchecked Futures --- .../accumulo/core/clientImpl/ConditionalWriterImpl.java | 12 +++++++++++- .../accumulo/core/clientImpl/TabletServerBatchWriter.java | 2 +- .../core/util/ratelimit/SharedRateLimiterFactory.java | 7 ++++++- core/src/main/java/org/apache/accumulo/fate/Fate.java | 11 ++++++++++- pom.xml | 1 + 5 files changed, 29 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 182d59e..f1b5415 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -111,6 +112,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private Map<String,ServerQueue> serverQueues; private DelayQueue<QCMutation> failedMutations = new DelayQueue<>(); private ScheduledThreadPoolExecutor threadPool; + private final ScheduledFuture<?> failureTaskFuture; private class RQIterator implements Iterator<Result> { @@ -378,12 +380,20 @@ class ConditionalWriterImpl implements ConditionalWriter { queue(mutations); }; - threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS); + failureTaskFuture = + threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS); } @Override public Iterator<Result> write(Iterator<ConditionalMutation> mutations) { + synchronized (failureTaskFuture) { + if (failureTaskFuture.isDone()) { + // TabletServerBatchWriter throws a MutationsRejectedException. + throw new RuntimeException("Background task that re-queues failed mutations has failed."); + } + } + BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); List<QCMutation> mutationList = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index fe25407..dfe06d5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -775,7 +775,7 @@ public class TabletServerBatchWriter implements AutoCloseable { for (String server : servers) if (!queued.contains(server)) { - sendThreadPool.submit(new SendTask(server)); + sendThreadPool.execute(new SendTask(server)); queued.add(server); } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java index c5c6890..30435e5 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java @@ -22,6 +22,7 @@ import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -41,6 +42,7 @@ public class SharedRateLimiterFactory { private static final long REPORT_RATE = 60000; private static final long UPDATE_RATE = 1000; private static SharedRateLimiterFactory instance = null; + private static ScheduledFuture<?> updateTaskFuture; private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class); private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters = new WeakHashMap<>(); @@ -53,7 +55,7 @@ public class SharedRateLimiterFactory { instance = new SharedRateLimiterFactory(); ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf); - svc.scheduleWithFixedDelay(Threads + updateTaskFuture = svc.scheduleWithFixedDelay(Threads .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll), UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS); @@ -89,6 +91,9 @@ public class SharedRateLimiterFactory { */ public RateLimiter create(String name, RateProvider rateProvider) { synchronized (activeLimiters) { + if (updateTaskFuture.isDone()) { + log.warn("SharedRateLimiterFactory update task has failed."); + } var limiterRef = activeLimiters.get(name); var limiter = limiterRef == null ? null : limiterRef.get(); if (limiter == null) { diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java index a2559ea..3b7fc23 100644 --- a/core/src/main/java/org/apache/accumulo/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +55,7 @@ public class Fate<T> { private final TStore<T> store; private final T environment; private ScheduledThreadPoolExecutor fatePoolWatcher; + private ScheduledFuture<?> fatePoolWatcherFuture; private ExecutorService executor; private static final EnumSet<TStatus> FINISHED_STATES = @@ -66,6 +68,9 @@ public class Fate<T> { @Override public void run() { while (keepRunning.get()) { + if (isFatePoolResizerFailed()) { + log.warn("FaTE thread pool resizer scheduled task has failed."); + } long deferTime = 0; Long tid = null; try { @@ -235,7 +240,7 @@ public class Fate<T> { final ThreadPoolExecutor pool = ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf); - fatePoolWatcher.schedule(() -> { + fatePoolWatcherFuture = fatePoolWatcher.schedule(() -> { // resize the pool if the property changed ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); // If the pool grew, then ensure that there is a TransactionRunner for each thread @@ -266,6 +271,10 @@ public class Fate<T> { return store.create(); } + private synchronized boolean isFatePoolResizerFailed() { + return fatePoolWatcherFuture.isDone(); + } + // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp, String goalMessage) { diff --git a/pom.xml b/pom.xml index 2018882..acc3e7a 100644 --- a/pom.xml +++ b/pom.xml @@ -1663,6 +1663,7 @@ -Xep:CheckReturnValue:OFF \ -Xep:MustBeClosedChecker:OFF \ -Xep:ReturnValueIgnored:OFF \ + -Xep:FutureReturnValueIgnored:ERROR \ -Xep:UnicodeInCode:OFF \ <!-- error/warning patterns to specifically check --> -Xep:ExpectedExceptionChecker \