This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new a48b1e6c22 Micrometer thread pool fixes (#5571) a48b1e6c22 is described below commit a48b1e6c22ccdfa22128b8500f18fa021a88e980 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu May 22 11:42:56 2025 -0400 Micrometer thread pool fixes (#5571) * Remove duplicate thread pools, disable metrics Modified code to use existing shared general scheduled thread pool from the context. Disabled metrics for the general scheduled thread pool so that messages regarding duplicate gauges being registered does not pollute the logs. * Modified BatchWriter and ThreadPools --- .../DefaultContextClassLoaderFactory.java | 24 +++++++------ .../core/clientImpl/TabletServerBatchWriter.java | 5 +-- .../accumulo/core/util/threads/ThreadPools.java | 38 +++++++++++++++++---- .../accumulo/server/metrics/MetricsInfoImpl.java | 4 +++ .../coordinator/CompactionCoordinator.java | 39 ++++++++++------------ .../coordinator/CompactionCoordinatorTest.java | 10 +++--- .../TestCompactionCoordinatorForOfflineTable.java | 10 +++--- 7 files changed, 79 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java index 19a8d38579..cc829a908e 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -72,17 +73,18 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto private static void startCleanupThread(final AccumuloConfiguration conf, final Supplier<Map<String,String>> contextConfigSupplier) { - ScheduledFuture<?> future = ThreadPools.getClientThreadPools((t, e) -> { - LOG.error("context classloader cleanup thread has failed.", e); - }).createGeneralScheduledExecutorService(conf) - .scheduleWithFixedDelay(Threads.createNamedRunnable(className + "-cleanup", () -> { - LOG.trace("{}-cleanup thread, properties: {}", className, conf); - Set<String> contextsInUse = contextConfigSupplier.get().keySet().stream() - .map(p -> p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length())) - .collect(Collectors.toSet()); - LOG.trace("{}-cleanup thread, contexts in use: {}", className, contextsInUse); - removeUnusedContexts(contextsInUse); - }), 1, 1, MINUTES); + ScheduledFuture<?> future = + ((ScheduledThreadPoolExecutor) ThreadPools.getClientThreadPools((t, e) -> { + LOG.error("context classloader cleanup thread has failed.", e); + }).createExecutorService(conf, Property.GENERAL_THREADPOOL_SIZE, false)) + .scheduleWithFixedDelay(Threads.createNamedRunnable(className + "-cleanup", () -> { + LOG.trace("{}-cleanup thread, properties: {}", className, conf); + Set<String> contextsInUse = contextConfigSupplier.get().keySet().stream() + .map(p -> p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length())) + .collect(Collectors.toSet()); + LOG.trace("{}-cleanup thread, contexts in use: {}", className, contextsInUse); + removeUnusedContexts(contextsInUse); + }), 1, 1, MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); LOG.debug("Context cleanup timer started at 60s intervals"); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 46fce95cfc..cda943d5c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -117,6 +117,7 @@ import io.opentelemetry.context.Scope; public class TabletServerBatchWriter implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(TabletServerBatchWriter.class); + private static final AtomicInteger numWritersCreated = new AtomicInteger(0); // basic configuration private final ClientContext context; @@ -210,8 +211,8 @@ public class TabletServerBatchWriter implements AutoCloseable { public TabletServerBatchWriter(ClientContext context, BatchWriterConfig config) { this.context = context; - this.executor = context.threadPools() - .createGeneralScheduledExecutorService(this.context.getConfiguration()); + this.executor = context.threadPools().createScheduledExecutorService(2, + "BatchWriterThreads-" + numWritersCreated.incrementAndGet(), true); this.failedMutations = new FailedMutations(); this.maxMem = config.getMaxMemory(); this.maxLatency = config.getMaxLatency(MILLISECONDS) <= 0 ? Long.MAX_VALUE diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 136065beb4..b30505c1fc 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -41,6 +41,7 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMM import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.OptionalInt; @@ -57,6 +58,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -69,6 +71,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; @@ -97,7 +100,9 @@ public class ThreadPools { } public static final ThreadPools getClientThreadPools(UncaughtExceptionHandler ueh) { - return new ThreadPools(ueh); + ThreadPools clientPools = new ThreadPools(ueh); + clientPools.setMeterRegistry(Metrics.globalRegistry); + return clientPools; } private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = @@ -601,7 +606,7 @@ public class ThreadPools { result.allowCoreThreadTimeOut(true); } if (emitThreadPoolMetrics) { - ThreadPools.addExecutorServiceMetrics(result, name); + addExecutorServiceMetrics(result, name); } return result; } @@ -644,7 +649,7 @@ public class ThreadPools { * errors over long time periods. * @return ScheduledThreadPoolExecutor */ - private ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, + public ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, final String name, boolean emitThreadPoolMetrics) { LOG.trace("Creating ScheduledThreadPoolExecutor for {} with {} threads", name, numThreads); var result = @@ -708,13 +713,34 @@ public class ThreadPools { }; if (emitThreadPoolMetrics) { - ThreadPools.addExecutorServiceMetrics(result, name); + addExecutorServiceMetrics(result, name); } return result; } - private static void addExecutorServiceMetrics(ExecutorService executor, String name) { - new ExecutorServiceMetrics(executor, name, List.of()).bindTo(Metrics.globalRegistry); + private final AtomicReference<MeterRegistry> registry = new AtomicReference<>(); + private final List<ExecutorServiceMetrics> earlyExecutorServices = new ArrayList<>(); + + private void addExecutorServiceMetrics(ExecutorService executor, String name) { + ExecutorServiceMetrics esm = new ExecutorServiceMetrics(executor, name, List.of()); + synchronized (earlyExecutorServices) { + MeterRegistry r = registry.get(); + if (r != null) { + esm.bindTo(r); + } else { + earlyExecutorServices.add(esm); + } + } + } + + public void setMeterRegistry(MeterRegistry r) { + if (registry.compareAndSet(null, r)) { + synchronized (earlyExecutorServices) { + earlyExecutorServices.forEach(e -> e.bindTo(r)); + } + } else { + throw new IllegalStateException("setMeterRegistry called more than once"); + } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 679c70cdae..1249d901c0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; @@ -163,6 +164,9 @@ public class MetricsInfoImpl implements MetricsInfo { } } + // Set the MeterRegistry on the ThreadPools + ThreadPools.getServerThreadPools().setMeterRegistry(Metrics.globalRegistry); + if (jvmMetricsEnabled) { LOG.info("enabling detailed jvm, classloader, jvm gc and process metrics"); new ClassLoaderMetrics().bindTo(Metrics.globalRegistry); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 11b89fccf6..59a0dd2782 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -139,8 +138,6 @@ public class CompactionCoordinator extends AbstractServer implements private ServiceLock coordinatorLock; - private final ScheduledThreadPoolExecutor schedExecutor; - private final LoadingCache<String,Integer> compactorCounts; protected CompactionCoordinator(ServerOpts opts, String[] args) { @@ -150,14 +147,13 @@ public class CompactionCoordinator extends AbstractServer implements protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfiguration conf) { super("compaction-coordinator", opts, args); aconf = conf == null ? super.getConfiguration() : conf; - schedExecutor = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf); - compactionFinalizer = createCompactionFinalizer(schedExecutor); + compactionFinalizer = createCompactionFinalizer(); tserverSet = createLiveTServerSet(); setupSecurity(); - startGCLogger(schedExecutor); + startGCLogger(); printStartupMsg(); - startCompactionCleaner(schedExecutor); - startRunningCleaner(schedExecutor); + startCompactionCleaner(); + startRunningCleaner(); compactorCounts = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS) .build(queue -> ExternalCompactionUtil.countCompactors(queue, getContext())); } @@ -167,9 +163,8 @@ public class CompactionCoordinator extends AbstractServer implements return aconf; } - protected CompactionFinalizer - createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) { - return new CompactionFinalizer(getContext(), schedExecutor); + protected CompactionFinalizer createCompactionFinalizer() { + return new CompactionFinalizer(getContext(), getContext().getScheduledExecutor()); } protected LiveTServerSet createLiveTServerSet() { @@ -180,22 +175,22 @@ public class CompactionCoordinator extends AbstractServer implements security = getContext().getSecurityOperation(); } - protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + protected void startGCLogger() { + ScheduledFuture<?> future = getContext().getScheduledExecutor().scheduleWithFixedDelay( + () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_GC_CHECKS, + TimeUnit.MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); } - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + protected void startCompactionCleaner() { + ScheduledFuture<?> future = getContext().getScheduledExecutor() + .scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); + protected void startRunningCleaner() { + ScheduledFuture<?> future = getContext().getScheduledExecutor() + .scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } @@ -447,7 +442,7 @@ public class CompactionCoordinator extends AbstractServer implements } protected void startDeadCompactionDetector() { - new DeadCompactionDetector(getContext(), this, schedExecutor).start(); + new DeadCompactionDetector(getContext(), this, getContext().getScheduledExecutor()).start(); } protected long getMissingCompactorWarningTime() { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 1f62ede587..8b320aac34 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -37,7 +37,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -139,13 +138,16 @@ public class CompactionCoordinatorTest { } @Override - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + protected void startCompactionCleaner() {} @Override - protected CompactionFinalizer createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) { + protected CompactionFinalizer createCompactionFinalizer() { return null; } + @Override + protected void startRunningCleaner() {} + @Override protected LiveTServerSet createLiveTServerSet() { return null; @@ -155,7 +157,7 @@ public class CompactionCoordinatorTest { protected void setupSecurity() {} @Override - protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {} + protected void startGCLogger() {} @Override protected void printStartupMsg() {} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java index a3cc78905e..a49f9439d6 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import java.util.concurrent.ScheduledThreadPoolExecutor; - import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.coordinator.CompactionFinalizer; import org.apache.accumulo.core.client.BatchWriter; @@ -45,8 +43,8 @@ public class TestCompactionCoordinatorForOfflineTable extends CompactionCoordina private static final Logger LOG = LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class); - NonNotifyingCompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor stpe) { - super(context, stpe); + NonNotifyingCompactionFinalizer(ServerContext context) { + super(context, context.getScheduledExecutor()); } @Override @@ -75,8 +73,8 @@ public class TestCompactionCoordinatorForOfflineTable extends CompactionCoordina } @Override - protected CompactionFinalizer createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) { - return new NonNotifyingCompactionFinalizer(getContext(), stpe); + protected CompactionFinalizer createCompactionFinalizer() { + return new NonNotifyingCompactionFinalizer(getContext()); } public static void main(String[] args) throws Exception {