This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a48b1e6c22 Micrometer thread pool fixes (#5571)
a48b1e6c22 is described below

commit a48b1e6c22ccdfa22128b8500f18fa021a88e980
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu May 22 11:42:56 2025 -0400

    Micrometer thread pool fixes (#5571)
    
    * Remove duplicate thread pools, disable metrics
    
    Modified code to use existing shared general
    scheduled thread pool from the context. Disabled
    metrics for the general scheduled thread pool so
    that messages regarding duplicate gauges being
    registered does not pollute the logs.
    
    * Modified BatchWriter and ThreadPools
---
 .../DefaultContextClassLoaderFactory.java          | 24 +++++++------
 .../core/clientImpl/TabletServerBatchWriter.java   |  5 +--
 .../accumulo/core/util/threads/ThreadPools.java    | 38 +++++++++++++++++----
 .../accumulo/server/metrics/MetricsInfoImpl.java   |  4 +++
 .../coordinator/CompactionCoordinator.java         | 39 ++++++++++------------
 .../coordinator/CompactionCoordinatorTest.java     | 10 +++---
 .../TestCompactionCoordinatorForOfflineTable.java  | 10 +++---
 7 files changed, 79 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
index 19a8d38579..cc829a908e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -72,17 +73,18 @@ public class DefaultContextClassLoaderFactory implements 
ContextClassLoaderFacto
 
   private static void startCleanupThread(final AccumuloConfiguration conf,
       final Supplier<Map<String,String>> contextConfigSupplier) {
-    ScheduledFuture<?> future = ThreadPools.getClientThreadPools((t, e) -> {
-      LOG.error("context classloader cleanup thread has failed.", e);
-    }).createGeneralScheduledExecutorService(conf)
-        .scheduleWithFixedDelay(Threads.createNamedRunnable(className + 
"-cleanup", () -> {
-          LOG.trace("{}-cleanup thread, properties: {}", className, conf);
-          Set<String> contextsInUse = 
contextConfigSupplier.get().keySet().stream()
-              .map(p -> 
p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length()))
-              .collect(Collectors.toSet());
-          LOG.trace("{}-cleanup thread, contexts in use: {}", className, 
contextsInUse);
-          removeUnusedContexts(contextsInUse);
-        }), 1, 1, MINUTES);
+    ScheduledFuture<?> future =
+        ((ScheduledThreadPoolExecutor) ThreadPools.getClientThreadPools((t, e) 
-> {
+          LOG.error("context classloader cleanup thread has failed.", e);
+        }).createExecutorService(conf, Property.GENERAL_THREADPOOL_SIZE, 
false))
+            .scheduleWithFixedDelay(Threads.createNamedRunnable(className + 
"-cleanup", () -> {
+              LOG.trace("{}-cleanup thread, properties: {}", className, conf);
+              Set<String> contextsInUse = 
contextConfigSupplier.get().keySet().stream()
+                  .map(p -> 
p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length()))
+                  .collect(Collectors.toSet());
+              LOG.trace("{}-cleanup thread, contexts in use: {}", className, 
contextsInUse);
+              removeUnusedContexts(contextsInUse);
+            }), 1, 1, MINUTES);
     ThreadPools.watchNonCriticalScheduledTask(future);
     LOG.debug("Context cleanup timer started at 60s intervals");
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 46fce95cfc..cda943d5c5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -117,6 +117,7 @@ import io.opentelemetry.context.Scope;
 public class TabletServerBatchWriter implements AutoCloseable {
 
   private static final Logger log = 
LoggerFactory.getLogger(TabletServerBatchWriter.class);
+  private static final AtomicInteger numWritersCreated = new AtomicInteger(0);
 
   // basic configuration
   private final ClientContext context;
@@ -210,8 +211,8 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
   public TabletServerBatchWriter(ClientContext context, BatchWriterConfig 
config) {
     this.context = context;
-    this.executor = context.threadPools()
-        
.createGeneralScheduledExecutorService(this.context.getConfiguration());
+    this.executor = context.threadPools().createScheduledExecutorService(2,
+        "BatchWriterThreads-" + numWritersCreated.incrementAndGet(), true);
     this.failedMutations = new FailedMutations();
     this.maxMem = config.getMaxMemory();
     this.maxLatency = config.getMaxLatency(MILLISECONDS) <= 0 ? Long.MAX_VALUE
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 136065beb4..b30505c1fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -41,6 +41,7 @@ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMM
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.OptionalInt;
@@ -57,6 +58,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.IntSupplier;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -69,6 +71,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Metrics;
 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
 
@@ -97,7 +100,9 @@ public class ThreadPools {
   }
 
   public static final ThreadPools 
getClientThreadPools(UncaughtExceptionHandler ueh) {
-    return new ThreadPools(ueh);
+    ThreadPools clientPools = new ThreadPools(ueh);
+    clientPools.setMeterRegistry(Metrics.globalRegistry);
+    return clientPools;
   }
 
   private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
@@ -601,7 +606,7 @@ public class ThreadPools {
       result.allowCoreThreadTimeOut(true);
     }
     if (emitThreadPoolMetrics) {
-      ThreadPools.addExecutorServiceMetrics(result, name);
+      addExecutorServiceMetrics(result, name);
     }
     return result;
   }
@@ -644,7 +649,7 @@ public class ThreadPools {
    *        errors over long time periods.
    * @return ScheduledThreadPoolExecutor
    */
-  private ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
+  public ScheduledThreadPoolExecutor createScheduledExecutorService(int 
numThreads,
       final String name, boolean emitThreadPoolMetrics) {
     LOG.trace("Creating ScheduledThreadPoolExecutor for {} with {} threads", 
name, numThreads);
     var result =
@@ -708,13 +713,34 @@ public class ThreadPools {
 
         };
     if (emitThreadPoolMetrics) {
-      ThreadPools.addExecutorServiceMetrics(result, name);
+      addExecutorServiceMetrics(result, name);
     }
     return result;
   }
 
-  private static void addExecutorServiceMetrics(ExecutorService executor, 
String name) {
-    new ExecutorServiceMetrics(executor, name, 
List.of()).bindTo(Metrics.globalRegistry);
+  private final AtomicReference<MeterRegistry> registry = new 
AtomicReference<>();
+  private final List<ExecutorServiceMetrics> earlyExecutorServices = new 
ArrayList<>();
+
+  private void addExecutorServiceMetrics(ExecutorService executor, String 
name) {
+    ExecutorServiceMetrics esm = new ExecutorServiceMetrics(executor, name, 
List.of());
+    synchronized (earlyExecutorServices) {
+      MeterRegistry r = registry.get();
+      if (r != null) {
+        esm.bindTo(r);
+      } else {
+        earlyExecutorServices.add(esm);
+      }
+    }
+  }
+
+  public void setMeterRegistry(MeterRegistry r) {
+    if (registry.compareAndSet(null, r)) {
+      synchronized (earlyExecutorServices) {
+        earlyExecutorServices.forEach(e -> e.bindTo(r));
+      }
+    } else {
+      throw new IllegalStateException("setMeterRegistry called more than 
once");
+    }
   }
 
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
index 679c70cdae..1249d901c0 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
@@ -163,6 +164,9 @@ public class MetricsInfoImpl implements MetricsInfo {
       }
     }
 
+    // Set the MeterRegistry on the ThreadPools
+    
ThreadPools.getServerThreadPools().setMeterRegistry(Metrics.globalRegistry);
+
     if (jvmMetricsEnabled) {
       LOG.info("enabling detailed jvm, classloader, jvm gc and process 
metrics");
       new ClassLoaderMetrics().bindTo(Metrics.globalRegistry);
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 11b89fccf6..59a0dd2782 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -139,8 +138,6 @@ public class CompactionCoordinator extends AbstractServer 
implements
 
   private ServiceLock coordinatorLock;
 
-  private final ScheduledThreadPoolExecutor schedExecutor;
-
   private final LoadingCache<String,Integer> compactorCounts;
 
   protected CompactionCoordinator(ServerOpts opts, String[] args) {
@@ -150,14 +147,13 @@ public class CompactionCoordinator extends AbstractServer 
implements
   protected CompactionCoordinator(ServerOpts opts, String[] args, 
AccumuloConfiguration conf) {
     super("compaction-coordinator", opts, args);
     aconf = conf == null ? super.getConfiguration() : conf;
-    schedExecutor = 
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-    compactionFinalizer = createCompactionFinalizer(schedExecutor);
+    compactionFinalizer = createCompactionFinalizer();
     tserverSet = createLiveTServerSet();
     setupSecurity();
-    startGCLogger(schedExecutor);
+    startGCLogger();
     printStartupMsg();
-    startCompactionCleaner(schedExecutor);
-    startRunningCleaner(schedExecutor);
+    startCompactionCleaner();
+    startRunningCleaner();
     compactorCounts = Caffeine.newBuilder().expireAfterWrite(30, 
TimeUnit.SECONDS)
         .build(queue -> ExternalCompactionUtil.countCompactors(queue, 
getContext()));
   }
@@ -167,9 +163,8 @@ public class CompactionCoordinator extends AbstractServer 
implements
     return aconf;
   }
 
-  protected CompactionFinalizer
-      createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) {
-    return new CompactionFinalizer(getContext(), schedExecutor);
+  protected CompactionFinalizer createCompactionFinalizer() {
+    return new CompactionFinalizer(getContext(), 
getContext().getScheduledExecutor());
   }
 
   protected LiveTServerSet createLiveTServerSet() {
@@ -180,22 +175,22 @@ public class CompactionCoordinator extends AbstractServer 
implements
     security = getContext().getSecurityOperation();
   }
 
-  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
-            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+  protected void startGCLogger() {
+    ScheduledFuture<?> future = 
getContext().getScheduledExecutor().scheduleWithFixedDelay(
+        () -> gcLogger.logGCInfo(getConfiguration()), 0, 
TIME_BETWEEN_GC_CHECKS,
+        TimeUnit.MILLISECONDS);
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
-  protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
+  protected void startCompactionCleaner() {
+    ScheduledFuture<?> future = getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
-  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
+  protected void startRunningCleaner() {
+    ScheduledFuture<?> future = getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
@@ -447,7 +442,7 @@ public class CompactionCoordinator extends AbstractServer 
implements
   }
 
   protected void startDeadCompactionDetector() {
-    new DeadCompactionDetector(getContext(), this, schedExecutor).start();
+    new DeadCompactionDetector(getContext(), this, 
getContext().getScheduledExecutor()).start();
   }
 
   protected long getMissingCompactorWarningTime() {
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 1f62ede587..8b320aac34 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -139,13 +138,16 @@ public class CompactionCoordinatorTest {
     }
 
     @Override
-    protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
+    protected void startCompactionCleaner() {}
 
     @Override
-    protected CompactionFinalizer 
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
+    protected CompactionFinalizer createCompactionFinalizer() {
       return null;
     }
 
+    @Override
+    protected void startRunningCleaner() {}
+
     @Override
     protected LiveTServerSet createLiveTServerSet() {
       return null;
@@ -155,7 +157,7 @@ public class CompactionCoordinatorTest {
     protected void setupSecurity() {}
 
     @Override
-    protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
+    protected void startGCLogger() {}
 
     @Override
     protected void printStartupMsg() {}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
index a3cc78905e..a49f9439d6 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
@@ -18,8 +18,6 @@
  */
 package org.apache.accumulo.test.compaction;
 
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
 import org.apache.accumulo.coordinator.CompactionCoordinator;
 import org.apache.accumulo.coordinator.CompactionFinalizer;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -45,8 +43,8 @@ public class TestCompactionCoordinatorForOfflineTable extends 
CompactionCoordina
     private static final Logger LOG =
         LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class);
 
-    NonNotifyingCompactionFinalizer(ServerContext context, 
ScheduledThreadPoolExecutor stpe) {
-      super(context, stpe);
+    NonNotifyingCompactionFinalizer(ServerContext context) {
+      super(context, context.getScheduledExecutor());
     }
 
     @Override
@@ -75,8 +73,8 @@ public class TestCompactionCoordinatorForOfflineTable extends 
CompactionCoordina
   }
 
   @Override
-  protected CompactionFinalizer 
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
-    return new NonNotifyingCompactionFinalizer(getContext(), stpe);
+  protected CompactionFinalizer createCompactionFinalizer() {
+    return new NonNotifyingCompactionFinalizer(getContext());
   }
 
   public static void main(String[] args) throws Exception {

Reply via email to