This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 56e3a617f6b95580c8ed52bc61b603492063ebcc
Merge: ab5c57eb3c d7cbdeceb7
Author: Kevin Rathbun <kevinrr...@gmail.com>
AuthorDate: Mon Oct 21 09:55:18 2024 -0400

    Merge branch '3.1'

 core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index b6860c557d,d4d790a45d..d7c711a20a
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -349,10 -237,9 +349,10 @@@ public class Fate<T> 
      this.environment = environment;
      final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
          Property.MANAGER_FATE_THREADPOOL_SIZE, true);
 +    this.workQueue = new LinkedTransferQueue<>();
      this.fatePoolWatcher =
          
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-     ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
+     
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
        // resize the pool if the property changed
        ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
        // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
@@@ -374,27 -261,8 +374,27 @@@
            }
          }
        }
-     }, 3, SECONDS));
+     }, 3, 30, SECONDS));
 -    this.executor = pool;
 +    this.transactionExecutor = pool;
 +
 +    ScheduledExecutorService deadResCleanerExecutor = null;
 +    if (runDeadResCleaner) {
 +      // Create a dead reservation cleaner for this store that will 
periodically clean up
 +      // reservations held by dead processes, if they exist.
 +      deadResCleanerExecutor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
 +          store.type() + "-dead-reservation-cleaner-pool");
 +      ScheduledFuture<?> deadReservationCleaner = 
deadResCleanerExecutor.scheduleWithFixedDelay(
 +          new DeadReservationCleaner(), 3, 
getDeadResCleanupDelay().toSeconds(), SECONDS);
 +      ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
 +    }
 +    this.deadResCleanerExecutor = deadResCleanerExecutor;
 +
 +    this.workFinder = Threads.createThread("Fate work finder", new 
WorkFinder());
 +    this.workFinder.start();
 +  }
 +
 +  public Duration getDeadResCleanupDelay() {
 +    return Duration.ofMinutes(3);
    }
  
    // get a transaction id back to the requester before doing any work

Reply via email to