This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new ffbbca7de6 backport server idle metric to 2.1 (#4716) ffbbca7de6 is described below commit ffbbca7de6bf45c84788d185180774cb2b882698 Author: Dom G <domgargu...@apache.org> AuthorDate: Tue Jul 9 15:41:13 2024 -0400 backport server idle metric to 2.1 (#4716) * Backport changes from idle stop PR #4078 * Convert idle metric from Counter to Gauge * Remove the COMPACTOR_BUSY metric as a duplicate of this one --------- Co-authored-by: Dave Marion <dlmar...@apache.org> --- .../org/apache/accumulo/core/conf/Property.java | 3 + .../accumulo/core/metrics/MetricsProducer.java | 18 +-- .../org/apache/accumulo/server/AbstractServer.java | 44 ++++++- .../accumulo/server/metrics/ProcessMetrics.java | 43 +++++++ .../org/apache/accumulo/compactor/Compactor.java | 20 ++-- .../apache/accumulo/compactor/CompactorTest.java | 3 + .../org/apache/accumulo/tserver/ScanServer.java | 4 +- .../org/apache/accumulo/tserver/TabletServer.java | 11 +- .../compaction/ExternalCompactionProgressIT.java | 23 +--- .../test/functional/IdleProcessMetricsIT.java | 130 +++++++++++++++++++++ .../apache/accumulo/test/metrics/MetricsIT.java | 2 +- 11 files changed, 258 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index fe4b8f4c18..629585058a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -294,6 +294,9 @@ public enum Property { GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d", PropertyType.TIMEDURATION, "The length of time between generation of new secret keys.", "1.7.0"), + GENERAL_IDLE_PROCESS_INTERVAL("general.metrics.process.idle", "5m", PropertyType.TIMEDURATION, + "Amount of time a process must be idle before it is considered to be idle by the metrics system.", + "2.1.3"), GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s", PropertyType.TIMEDURATION, "The maximum amount of time that a Scanner should wait before retrying a failed RPC.", diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 3fdb1a4309..8cf2ffc956 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -44,6 +44,13 @@ import io.micrometer.core.instrument.MeterRegistry; * <th>Micrometer Type</th> * <th>Notes</th> * </tr> + * <!-- general server metrics --> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_SERVER_IDLE}</td> + * <td>Gauge</td> + * <td>Indicates if the server is idle or not. The value will be 1 when idle and 0 when not idle. * <!-- compactor --> * <tr> * <td>N/A</td> @@ -66,14 +73,6 @@ import io.micrometer.core.instrument.MeterRegistry; * <td>FunctionCounter</td> * <td>Number of entries written by all threads performing compactions</td> * </tr> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_COMPACTOR_BUSY}</td> - * <td>Gauge</td> - * <td>Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when - * busy.</td> - * </tr> * <!-- fate --> * <tr> * <td>currentFateOps</td> @@ -605,11 +604,12 @@ public interface MetricsProducer { Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); + String METRICS_SERVER_IDLE = "accumulo.server.idle"; + String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; - String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index ac1bcaab90..af679c13d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -19,22 +19,32 @@ package org.apache.accumulo.server; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractServer implements AutoCloseable, Runnable { +import io.micrometer.core.instrument.MeterRegistry; + +public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable { private final ServerContext context; protected final String applicationName; private final String hostname; private final Logger log; + private final ProcessMetrics processMetrics; + protected final long idleReportingPeriodNanos; + private volatile long idlePeriodStartNanos = 0L; protected AbstractServer(String appName, ServerOpts opts, String[] args) { this.log = LoggerFactory.getLogger(getClass().getName()); @@ -53,6 +63,30 @@ public abstract class AbstractServer implements AutoCloseable, Runnable { // Server-side "client" check to make sure we're logged in as a user we expect to be context.enforceKerberosLogin(); } + processMetrics = new ProcessMetrics(); + idleReportingPeriodNanos = TimeUnit.MILLISECONDS.toNanos( + context.getConfiguration().getTimeInMillis(Property.GENERAL_IDLE_PROCESS_INTERVAL)); + } + + protected void idleProcessCheck(Supplier<Boolean> idleCondition) { + boolean isIdle = idleCondition.get(); + boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodNanos == 0; + boolean isIdlePeriodNotStarted = idlePeriodStartNanos == 0; + boolean hasExceededIdlePeriod = + (System.nanoTime() - idlePeriodStartNanos) > idleReportingPeriodNanos; + + if (shouldResetIdlePeriod) { + // Reset idle period and set idle metric to false + idlePeriodStartNanos = 0; + processMetrics.setIdleValue(false); + } else if (isIdlePeriodNotStarted) { + // Start tracking idle period + idlePeriodStartNanos = System.nanoTime(); + } else if (hasExceededIdlePeriod) { + // Set idle metric to true and reset the start of the idle period + processMetrics.setIdleValue(true); + idlePeriodStartNanos = 0; + } } /** @@ -76,6 +110,14 @@ public abstract class AbstractServer implements AutoCloseable, Runnable { } } + @Override + public void registerMetrics(MeterRegistry registry) { + // makes mocking subclasses easier + if (processMetrics != null) { + processMetrics.registerMetrics(registry); + } + } + public String getHostname() { return hostname; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java new file mode 100644 index 0000000000..4ebbeb22a2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.metrics.MetricsProducer; + +import io.micrometer.core.instrument.MeterRegistry; + +public class ProcessMetrics implements MetricsProducer { + + private final AtomicInteger isIdle; + + public ProcessMetrics() { + this.isIdle = new AtomicInteger(-1); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get); + } + + public void setIdleValue(boolean isIdle) { + this.isIdle.set(isIdle ? 1 : 0); + } +} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 63c525fb5a..3039ab9c8a 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; @@ -122,7 +123,6 @@ import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; import io.micrometer.core.instrument.FunctionCounter; -import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@ -188,6 +188,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac @Override public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, Compactor::getTotalEntriesRead) .description("Number of entries read by all compactions that have run on this compactor") .register(registry); @@ -198,13 +199,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); - - Gauge - .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning, - isRunning -> isRunning.get() ? 1 : 0) - .description( - "Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when busy.") - .register(registry); } protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { @@ -708,8 +702,17 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac try { final AtomicReference<Throwable> err = new AtomicReference<>(); + final AtomicLong timeSinceLastCompletion = new AtomicLong(0L); while (!shutdown) { + + idleProcessCheck(() -> { + return timeSinceLastCompletion.get() == 0 + /* Never started a compaction */ || (timeSinceLastCompletion.get() > 0 + && (System.nanoTime() - timeSinceLastCompletion.get()) + > idleReportingPeriodNanos); + }); + currentCompactionId.set(null); err.set(null); JOB_HOLDER.reset(); @@ -858,6 +861,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } } finally { currentCompactionId.set(null); + timeSinceLastCompletion.set(System.nanoTime()); // In the case where there is an error in the foreground code the background compaction // may still be running. Must cancel it before starting another iteration of the loop to // avoid multiple threads updating shared state. diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index cc3708f6f4..9e89e024a4 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -323,6 +323,7 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + PowerMock.suppress(PowerMock.methods(AbstractServer.class, "idleProcessCheck")); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); HostAndPort address = HostAndPort.fromString("localhost:10240"); @@ -373,6 +374,7 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + PowerMock.suppress(PowerMock.methods(AbstractServer.class, "idleProcessCheck")); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); HostAndPort address = HostAndPort.fromString("localhost:10240"); @@ -424,6 +426,7 @@ public class CompactorTest { PowerMock.resetAll(); PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + PowerMock.suppress(PowerMock.methods(AbstractServer.class, "idleProcessCheck")); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); HostAndPort address = HostAndPort.fromString("localhost:10240"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index d3c61fdd0d..d8027fa968 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -411,7 +411,7 @@ public class ScanServer extends AbstractServer blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(), resourceManager.getDataCache(), resourceManager.getSummaryCache()); - metricsInfo.addMetricsProducers(scanMetrics, scanServerMetrics, blockCacheMetrics); + metricsInfo.addMetricsProducers(this, scanMetrics, scanServerMetrics, blockCacheMetrics); metricsInfo.init(); // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close @@ -420,6 +420,8 @@ public class ScanServer extends AbstractServer try { while (!serverStopRequested) { UtilWaitThread.sleep(1000); + idleProcessCheck(() -> sessionManager.getActiveScans().isEmpty() + && tabletMetadataCache.estimatedSize() == 0); } } finally { LOG.info("Stopping Thrift Servers"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 85b0872385..cb4e5f534d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -771,8 +771,8 @@ public class TabletServer extends AbstractServer implements TabletHostingServer blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache()); - metricsInfo.addMetricsProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics, - blockCacheMetrics); + metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics, + ceMetrics, blockCacheMetrics); metricsInfo.init(); this.compactionManager = new CompactionManager(() -> Iterators @@ -871,16 +871,20 @@ public class TabletServer extends AbstractServer implements TabletHostingServer HostAndPort managerHost; while (!serverStopRequested) { + + idleProcessCheck(() -> getOnlineTablets().isEmpty()); + // send all of the pending messages try { ManagerMessage mm = null; ManagerClientService.Client iface = null; try { - // wait until a message is ready to send, or a sever stop + // wait until a message is ready to send, or a server stop // was requested while (mm == null && !serverStopRequested) { mm = managerMessages.poll(1, TimeUnit.SECONDS); + idleProcessCheck(() -> getOnlineTablets().isEmpty()); } // have a message to send to the manager, so grab a @@ -908,6 +912,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer // if any messages are immediately available grab em and // send them mm = managerMessages.poll(); + idleProcessCheck(() -> getOnlineTablets().isEmpty()); } } finally { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 89f887251b..535d50b34b 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.compaction; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.core.util.UtilWaitThread.sleep; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; @@ -39,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.compactor.Compactor; @@ -218,12 +216,10 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { final AtomicLong totalEntriesRead = new AtomicLong(0); final AtomicLong totalEntriesWritten = new AtomicLong(0); - final AtomicInteger compactorBusy = new AtomicInteger(-1); final long expectedEntriesRead = 9216; final long expectedEntriesWritten = 4096; - Thread checkerThread = - getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, compactorBusy); + Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -241,13 +237,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, - "Compactor busy metric should be false initially"); - - compact(client, table, 2, QUEUE1, false); - - Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, CHECKER_THREAD_SLEEP_MS, - "Compactor busy metric should be true after starting compaction"); + compact(client, table, 2, QUEUE1, true); Wait.waitFor(() -> { if (totalEntriesRead.get() == expectedEntriesRead @@ -262,9 +252,6 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { }, 30_000, CHECKER_THREAD_SLEEP_MS, "Entries read and written metrics values did not match expected values"); - Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, - "Compactor busy metric should be false once compaction completes"); - log.info("Done Compacting table"); verify(client, table, 2, ROWS); } finally { @@ -280,10 +267,9 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { * * @param totalEntriesRead this is set to the value of the entries read metric * @param totalEntriesWritten this is set to the value of the entries written metric - * @param compactorBusy this is set to the value of the compactor busy metric */ private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, - AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) { + AtomicLong totalEntriesWritten) { return Threads.createThread("metric-tailer", () -> { log.info("Starting metric tailer"); @@ -308,9 +294,6 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN: totalEntriesWritten.addAndGet(value); break; - case MetricsProducer.METRICS_COMPACTOR_BUSY: - compactorBusy.set(value); - break; } } sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java new file mode 100644 index 0000000000..6366d16ae9 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IdleProcessMetricsIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(IdleProcessMetricsIT.class); + + static final Duration idleProcessInterval = Duration.ofSeconds(10); + + public static class IdleStopITConfig implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + cfg.setNumCompactors(1); + cfg.setNumTservers(1); + cfg.setNumScanServers(1); + + cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL, + idleProcessInterval.toSeconds() + "s"); + + // Tell the server processes to use a StatsDMeterRegistry that will be configured + // to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, + TestStatsDRegistryFactory.class.getName()); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + + } + + } + + private static TestStatsDSink sink; + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + SharedMiniClusterBase.startMiniClusterWithConfig(new IdleStopITConfig()); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + SharedMiniClusterBase.stopMiniCluster(); + } + + @Test + public void testIdleStopMetrics() throws Exception { + + getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); + getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); + getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + // should emit the idle metric after the configured duration of GENERAL_IDLE_PROCESS_INTERVAL + Thread.sleep(idleProcessInterval.toMillis()); + + AtomicBoolean sawCompactor = new AtomicBoolean(false); + AtomicBoolean sawSServer = new AtomicBoolean(false); + AtomicBoolean sawTServer = new AtomicBoolean(false); + Wait.waitFor(() -> { + List<String> statsDMetrics = sink.getLines(); + statsDMetrics.stream().filter(line -> line.startsWith(MetricsProducer.METRICS_SERVER_IDLE)) + .peek(log::info).map(TestStatsDSink::parseStatsDMetric).forEach(a -> { + String processName = a.getTags().get("process.name"); + int value = Integer.parseInt(a.getValue()); + assertTrue(value == 0 || value == 1 || value == -1, "Unexpected value " + value); + if ("tserver".equals(processName) && value == 0) { + // Expect tserver to never be idle + sawTServer.set(true); + } else if ("sserver".equals(processName) && value == 1) { + // Expect scan server to be idle + sawSServer.set(true); + } else if ("compactor".equals(processName) && value == 1) { + // Expect compactor to be idle + sawCompactor.set(true); + } + + }); + return sawCompactor.get() && sawSServer.get() && sawTServer.get(); + }); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 5a46507db1..fc0ecdb881 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -102,13 +102,13 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { // @formatter:off Set<String> unexpectedMetrics = Set.of(METRICS_COMPACTOR_MAJC_STUCK, - METRICS_COMPACTOR_BUSY, METRICS_REPLICATION_QUEUE, METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS); // add sserver as flaky until scan server included in mini tests. Set<String> flakyMetrics = Set.of(METRICS_FATE_TYPE_IN_PROGRESS, + METRICS_SERVER_IDLE, METRICS_SCAN_BUSY_TIMEOUT_COUNTER, METRICS_SCAN_RESERVATION_CONFLICT_COUNTER, METRICS_SCAN_RESERVATION_TOTAL_TIMER,