This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 9c436cb466 Use General ScheduledExecutorThreadPool where possible (#5472) 9c436cb466 is described below commit 9c436cb466490702b953482c07acf5a2d562af8d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 14 12:31:50 2025 -0400 Use General ScheduledExecutorThreadPool where possible (#5472) Noticed several places in the code where different thread pools were being created based on the GENERAL_THREADPOOL_SIZE property instead of re-using the one available via the ServerContext. --- .../accumulo/core/clientImpl/ClientContext.java | 1 - .../java/org/apache/accumulo/core/fate/Fate.java | 55 +++++++++++----------- .../util/ratelimit/SharedRateLimiterFactory.java | 10 ++-- .../org/apache/accumulo/compactor/Compactor.java | 3 +- .../apache/accumulo/compactor/CompactorTest.java | 4 ++ .../java/org/apache/accumulo/manager/Manager.java | 2 +- .../tserver/compactions/CompactionService.java | 4 +- .../accumulo/test/fate/zookeeper/FateIT.java | 11 +++-- 8 files changed, 45 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 03bd7b66bc..c9e714f74c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -1132,5 +1132,4 @@ public class ClientContext implements AccumuloClient { } return this.zkLockChecker; } - } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 279a1bf099..aac1921914 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -62,7 +62,6 @@ public class Fate<T> { private final TStore<T> store; private final T environment; - private ScheduledThreadPoolExecutor fatePoolWatcher; private ExecutorService executor; private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); @@ -243,8 +242,9 @@ public class Fate<T> { /** * Creates a Fault-tolerant executor. * <p> - * Note: Users of this class should call {@link #startTransactionRunners(AccumuloConfiguration)} - * to launch the worker threads after creating a Fate object. + * Note: Users of this class should call + * {@link #startTransactionRunners(AccumuloConfiguration, ScheduledThreadPoolExecutor)} to launch + * the worker threads after creating a Fate object. * * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging */ @@ -256,34 +256,34 @@ public class Fate<T> { /** * Launches the specified number of worker threads. */ - public void startTransactionRunners(AccumuloConfiguration conf) { + public void startTransactionRunners(AccumuloConfiguration conf, + ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) { final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); - fatePoolWatcher = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); - ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(() -> { - // resize the pool if the property changed - ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); - // If the pool grew, then ensure that there is a TransactionRunner for each thread - int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getActiveCount(); - if (needed > 0) { - for (int i = 0; i < needed; i++) { - try { - pool.execute(new TransactionRunner()); - } catch (RejectedExecutionException e) { - // RejectedExecutionException could be shutting down - if (pool.isShutdown()) { - // The exception is expected in this case, no need to spam the logs. - log.trace("Error adding transaction runner to FaTE executor pool.", e); - } else { - // This is bad, FaTE may no longer work! - log.error("Error adding transaction runner to FaTE executor pool.", e); + ThreadPools + .watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(() -> { + // resize the pool if the property changed + ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); + // If the pool grew, then ensure that there is a TransactionRunner for each thread + int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getActiveCount(); + if (needed > 0) { + for (int i = 0; i < needed; i++) { + try { + pool.execute(new TransactionRunner()); + } catch (RejectedExecutionException e) { + // RejectedExecutionException could be shutting down + if (pool.isShutdown()) { + // The exception is expected in this case, no need to spam the logs. + log.trace("Error adding transaction runner to FaTE executor pool.", e); + } else { + // This is bad, FaTE may no longer work! + log.error("Error adding transaction runner to FaTE executor pool.", e); + } + break; + } } - break; } - } - } - }, 3, 30, SECONDS)); + }, 3, 30, SECONDS)); executor = pool; } @@ -421,7 +421,6 @@ public class Fate<T> { */ public void shutdown() { keepRunning.set(false); - fatePoolWatcher.shutdown(); executor.shutdown(); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java index 95bc63fbac..7b086522d5 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java @@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.slf4j.Logger; @@ -52,17 +51,16 @@ public class SharedRateLimiterFactory { private SharedRateLimiterFactory() {} /** Get the singleton instance of the SharedRateLimiterFactory. */ - public static synchronized SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { + public static synchronized SharedRateLimiterFactory + getInstance(ScheduledThreadPoolExecutor executor) { if (instance == null) { instance = new SharedRateLimiterFactory(); - ScheduledThreadPoolExecutor svc = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); - updateTaskFuture = svc.scheduleWithFixedDelay(Threads + updateTaskFuture = executor.scheduleWithFixedDelay(Threads .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll), UPDATE_RATE, UPDATE_RATE, MILLISECONDS); - ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads + ScheduledFuture<?> future = executor.scheduleWithFixedDelay(Threads .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll), REPORT_RATE, REPORT_RATE, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 109f287ffa..9d95a73429 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -686,8 +686,7 @@ public class Compactor extends AbstractServer metricsInfo.init(getServiceTags(clientAddress)); var watcher = new CompactionWatcher(getConfiguration()); - var schedExecutor = ThreadPools.getServerThreadPools() - .createGeneralScheduledExecutorService(getConfiguration()); + var schedExecutor = getContext().getScheduledExecutor(); startGCLogger(schedExecutor); startCancelChecker(schedExecutor, getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 74aaf09fa3..3489e8fef8 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -300,6 +300,10 @@ public class CompactorTest { return List.of(); } + @Override + protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, + long timeBetweenChecks) {} + } public class FailedCompactor extends SuccessfulCompactor implements ServerProcessService.Iface { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 616dc7dac4..8cb51d160b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1353,7 +1353,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, HOURS.toMillis(8), System::currentTimeMillis); Fate<Manager> f = initializeFateInstance(store); - f.startTransactionRunners(getConfiguration()); + f.startTransactionRunners(getConfiguration(), getContext().getScheduledExecutor()); fateRef.set(f); fateReadyLatch.countDown(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 52107f19d0..49d5644795 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -119,9 +119,9 @@ public class CompactionService { this.rateLimit.set(maxRate); - this.readLimiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()) + this.readLimiter = SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor()) .create("CS_" + serviceName + "_read", () -> rateLimit.get()); - this.writeLimiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()) + this.writeLimiter = SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor()) .create("CS_" + serviceName + "_write", () -> rateLimit.get()); initParams.getRequestedExecutors().forEach((ceid, numThreads) -> { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index bc92312d41..9962d3941f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.Constants; @@ -235,7 +236,7 @@ public class FateIT { fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid)); - fate.startTransactionRunners(config); + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); // Wait for the transaction runner to be scheduled. UtilWaitThread.sleep(3000); @@ -300,7 +301,7 @@ public class FateIT { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); // Wait for the transaction runner to be scheduled. UtilWaitThread.sleep(3000); @@ -376,7 +377,7 @@ public class FateIT { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); // Wait for the transaction runner to be scheduled. UtilWaitThread.sleep(3000); @@ -430,7 +431,7 @@ public class FateIT { fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(SUBMITTED, getTxStatus(zk, txid)); - fate.startTransactionRunners(config); + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); // Wait for the transaction runner to be scheduled. UtilWaitThread.sleep(3000); @@ -469,7 +470,7 @@ public class FateIT { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); // Wait for the transaction runner to be scheduled. UtilWaitThread.sleep(3000);