This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9c436cb466 Use General ScheduledExecutorThreadPool where possible 
(#5472)
9c436cb466 is described below

commit 9c436cb466490702b953482c07acf5a2d562af8d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 14 12:31:50 2025 -0400

    Use General ScheduledExecutorThreadPool where possible (#5472)
    
    Noticed several places in the code where different thread
    pools were being created based on the GENERAL_THREADPOOL_SIZE
    property instead of re-using the one available via the
    ServerContext.
---
 .../accumulo/core/clientImpl/ClientContext.java    |  1 -
 .../java/org/apache/accumulo/core/fate/Fate.java   | 55 +++++++++++-----------
 .../util/ratelimit/SharedRateLimiterFactory.java   | 10 ++--
 .../org/apache/accumulo/compactor/Compactor.java   |  3 +-
 .../apache/accumulo/compactor/CompactorTest.java   |  4 ++
 .../java/org/apache/accumulo/manager/Manager.java  |  2 +-
 .../tserver/compactions/CompactionService.java     |  4 +-
 .../accumulo/test/fate/zookeeper/FateIT.java       | 11 +++--
 8 files changed, 45 insertions(+), 45 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 03bd7b66bc..c9e714f74c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -1132,5 +1132,4 @@ public class ClientContext implements AccumuloClient {
     }
     return this.zkLockChecker;
   }
-
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 279a1bf099..aac1921914 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -62,7 +62,6 @@ public class Fate<T> {
 
   private final TStore<T> store;
   private final T environment;
-  private ScheduledThreadPoolExecutor fatePoolWatcher;
   private ExecutorService executor;
 
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
@@ -243,8 +242,9 @@ public class Fate<T> {
   /**
    * Creates a Fault-tolerant executor.
    * <p>
-   * Note: Users of this class should call {@link 
#startTransactionRunners(AccumuloConfiguration)}
-   * to launch the worker threads after creating a Fate object.
+   * Note: Users of this class should call
+   * {@link #startTransactionRunners(AccumuloConfiguration, 
ScheduledThreadPoolExecutor)} to launch
+   * the worker threads after creating a Fate object.
    *
    * @param toLogStrFunc A function that converts Repo to Strings that are 
suitable for logging
    */
@@ -256,34 +256,34 @@ public class Fate<T> {
   /**
    * Launches the specified number of worker threads.
    */
-  public void startTransactionRunners(AccumuloConfiguration conf) {
+  public void startTransactionRunners(AccumuloConfiguration conf,
+      ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) {
     final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
-    fatePoolWatcher =
-        
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-    
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
-      // resize the pool if the property changed
-      ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-      // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
-      int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - 
pool.getActiveCount();
-      if (needed > 0) {
-        for (int i = 0; i < needed; i++) {
-          try {
-            pool.execute(new TransactionRunner());
-          } catch (RejectedExecutionException e) {
-            // RejectedExecutionException could be shutting down
-            if (pool.isShutdown()) {
-              // The exception is expected in this case, no need to spam the 
logs.
-              log.trace("Error adding transaction runner to FaTE executor 
pool.", e);
-            } else {
-              // This is bad, FaTE may no longer work!
-              log.error("Error adding transaction runner to FaTE executor 
pool.", e);
+    ThreadPools
+        
.watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(()
 -> {
+          // resize the pool if the property changed
+          ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
+          // If the pool grew, then ensure that there is a TransactionRunner 
for each thread
+          int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - 
pool.getActiveCount();
+          if (needed > 0) {
+            for (int i = 0; i < needed; i++) {
+              try {
+                pool.execute(new TransactionRunner());
+              } catch (RejectedExecutionException e) {
+                // RejectedExecutionException could be shutting down
+                if (pool.isShutdown()) {
+                  // The exception is expected in this case, no need to spam 
the logs.
+                  log.trace("Error adding transaction runner to FaTE executor 
pool.", e);
+                } else {
+                  // This is bad, FaTE may no longer work!
+                  log.error("Error adding transaction runner to FaTE executor 
pool.", e);
+                }
+                break;
+              }
             }
-            break;
           }
-        }
-      }
-    }, 3, 30, SECONDS));
+        }, 3, 30, SECONDS));
     executor = pool;
   }
 
@@ -421,7 +421,6 @@ public class Fate<T> {
    */
   public void shutdown() {
     keepRunning.set(false);
-    fatePoolWatcher.shutdown();
     executor.shutdown();
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index 95bc63fbac..7b086522d5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.slf4j.Logger;
@@ -52,17 +51,16 @@ public class SharedRateLimiterFactory {
   private SharedRateLimiterFactory() {}
 
   /** Get the singleton instance of the SharedRateLimiterFactory. */
-  public static synchronized SharedRateLimiterFactory 
getInstance(AccumuloConfiguration conf) {
+  public static synchronized SharedRateLimiterFactory
+      getInstance(ScheduledThreadPoolExecutor executor) {
     if (instance == null) {
       instance = new SharedRateLimiterFactory();
 
-      ScheduledThreadPoolExecutor svc =
-          
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-      updateTaskFuture = svc.scheduleWithFixedDelay(Threads
+      updateTaskFuture = executor.scheduleWithFixedDelay(Threads
           .createNamedRunnable("SharedRateLimiterFactory update polling", 
instance::updateAll),
           UPDATE_RATE, UPDATE_RATE, MILLISECONDS);
 
-      ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads
+      ScheduledFuture<?> future = executor.scheduleWithFixedDelay(Threads
           .createNamedRunnable("SharedRateLimiterFactory report polling", 
instance::reportAll),
           REPORT_RATE, REPORT_RATE, MILLISECONDS);
       ThreadPools.watchNonCriticalScheduledTask(future);
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 109f287ffa..9d95a73429 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -686,8 +686,7 @@ public class Compactor extends AbstractServer
     metricsInfo.init(getServiceTags(clientAddress));
 
     var watcher = new CompactionWatcher(getConfiguration());
-    var schedExecutor = ThreadPools.getServerThreadPools()
-        .createGeneralScheduledExecutorService(getConfiguration());
+    var schedExecutor = getContext().getScheduledExecutor();
     startGCLogger(schedExecutor);
     startCancelChecker(schedExecutor,
         
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 74aaf09fa3..3489e8fef8 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -300,6 +300,10 @@ public class CompactorTest {
       return List.of();
     }
 
+    @Override
+    protected void startCancelChecker(ScheduledThreadPoolExecutor 
schedExecutor,
+        long timeBetweenChecks) {}
+
   }
 
   public class FailedCompactor extends SuccessfulCompactor implements 
ServerProcessService.Iface {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 616dc7dac4..8cb51d160b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1353,7 +1353,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
           HOURS.toMillis(8), System::currentTimeMillis);
 
       Fate<Manager> f = initializeFateInstance(store);
-      f.startTransactionRunners(getConfiguration());
+      f.startTransactionRunners(getConfiguration(), 
getContext().getScheduledExecutor());
       fateRef.set(f);
       fateReadyLatch.countDown();
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 52107f19d0..49d5644795 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -119,9 +119,9 @@ public class CompactionService {
 
     this.rateLimit.set(maxRate);
 
-    this.readLimiter = 
SharedRateLimiterFactory.getInstance(this.context.getConfiguration())
+    this.readLimiter = 
SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor())
         .create("CS_" + serviceName + "_read", () -> rateLimit.get());
-    this.writeLimiter = 
SharedRateLimiterFactory.getInstance(this.context.getConfiguration())
+    this.writeLimiter = 
SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor())
         .create("CS_" + serviceName + "_write", () -> rateLimit.get());
 
     initParams.getRequestedExecutors().forEach((ceid, numThreads) -> {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index bc92312d41..9962d3941f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.Constants;
@@ -235,7 +236,7 @@ public class FateIT {
       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
       assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
 
-      fate.startTransactionRunners(config);
+      fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
 
@@ -300,7 +301,7 @@ public class FateIT {
       ConfigurationCopy config = new ConfigurationCopy();
       config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
       config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
+      fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
@@ -376,7 +377,7 @@ public class FateIT {
       ConfigurationCopy config = new ConfigurationCopy();
       config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
       config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
+      fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
@@ -430,7 +431,7 @@ public class FateIT {
       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
       assertEquals(SUBMITTED, getTxStatus(zk, txid));
 
-      fate.startTransactionRunners(config);
+      fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
 
@@ -469,7 +470,7 @@ public class FateIT {
       ConfigurationCopy config = new ConfigurationCopy();
       config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
       config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
+      fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);

Reply via email to