This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 56e3a617f6b95580c8ed52bc61b603492063ebcc Merge: ab5c57eb3c d7cbdeceb7 Author: Kevin Rathbun <kevinrr...@gmail.com> AuthorDate: Mon Oct 21 09:55:18 2024 -0400 Merge branch '3.1' core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index b6860c557d,d4d790a45d..d7c711a20a --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -349,10 -237,9 +349,10 @@@ public class Fate<T> this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); + this.workQueue = new LinkedTransferQueue<>(); this.fatePoolWatcher = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); - ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> { + ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(() -> { // resize the pool if the property changed ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); // If the pool grew, then ensure that there is a TransactionRunner for each thread @@@ -374,27 -261,8 +374,27 @@@ } } } - }, 3, SECONDS)); + }, 3, 30, SECONDS)); - this.executor = pool; + this.transactionExecutor = pool; + + ScheduledExecutorService deadResCleanerExecutor = null; + if (runDeadResCleaner) { + // Create a dead reservation cleaner for this store that will periodically clean up + // reservations held by dead processes, if they exist. + deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, + store.type() + "-dead-reservation-cleaner-pool"); + ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor.scheduleWithFixedDelay( + new DeadReservationCleaner(), 3, getDeadResCleanupDelay().toSeconds(), SECONDS); + ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); + } + this.deadResCleanerExecutor = deadResCleanerExecutor; + + this.workFinder = Threads.createThread("Fate work finder", new WorkFinder()); + this.workFinder.start(); + } + + public Duration getDeadResCleanupDelay() { + return Duration.ofMinutes(3); } // get a transaction id back to the requester before doing any work