This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ffbbca7de6 backport server idle metric to 2.1 (#4716)
ffbbca7de6 is described below

commit ffbbca7de6bf45c84788d185180774cb2b882698
Author: Dom G <domgargu...@apache.org>
AuthorDate: Tue Jul 9 15:41:13 2024 -0400

    backport server idle metric to 2.1 (#4716)
    
    * Backport changes from idle stop PR #4078
    * Convert idle metric from Counter to Gauge
    * Remove the COMPACTOR_BUSY metric as a duplicate of this one
    
    ---------
    
    Co-authored-by: Dave Marion <dlmar...@apache.org>
---
 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../accumulo/core/metrics/MetricsProducer.java     |  18 +--
 .../org/apache/accumulo/server/AbstractServer.java |  44 ++++++-
 .../accumulo/server/metrics/ProcessMetrics.java    |  43 +++++++
 .../org/apache/accumulo/compactor/Compactor.java   |  20 ++--
 .../apache/accumulo/compactor/CompactorTest.java   |   3 +
 .../org/apache/accumulo/tserver/ScanServer.java    |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  11 +-
 .../compaction/ExternalCompactionProgressIT.java   |  23 +---
 .../test/functional/IdleProcessMetricsIT.java      | 130 +++++++++++++++++++++
 .../apache/accumulo/test/metrics/MetricsIT.java    |   2 +-
 11 files changed, 258 insertions(+), 43 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fe4b8f4c18..629585058a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -294,6 +294,9 @@ public enum Property {
   
GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval",
 "1d",
       PropertyType.TIMEDURATION, "The length of time between generation of new 
secret keys.",
       "1.7.0"),
+  GENERAL_IDLE_PROCESS_INTERVAL("general.metrics.process.idle", "5m", 
PropertyType.TIMEDURATION,
+      "Amount of time a process must be idle before it is considered to be 
idle by the metrics system.",
+      "2.1.3"),
   GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s",
       PropertyType.TIMEDURATION,
       "The maximum amount of time that a Scanner should wait before retrying a 
failed RPC.",
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 3fdb1a4309..8cf2ffc956 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -44,6 +44,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <th>Micrometer Type</th>
  * <th>Notes</th>
  * </tr>
+ * <!-- general server metrics -->
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_SERVER_IDLE}</td>
+ * <td>Gauge</td>
+ * <td>Indicates if the server is idle or not. The value will be 1 when idle 
and 0 when not idle.
  * <!-- compactor -->
  * <tr>
  * <td>N/A</td>
@@ -66,14 +73,6 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>FunctionCounter</td>
  * <td>Number of entries written by all threads performing compactions</td>
  * </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_COMPACTOR_BUSY}</td>
- * <td>Gauge</td>
- * <td>Indicates if the compactor is busy or not. The value will be 0 when 
idle and 1 when
- * busy.</td>
- * </tr>
  * <!-- fate -->
  * <tr>
  * <td>currentFateOps</td>
@@ -605,11 +604,12 @@ public interface MetricsProducer {
 
   Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
 
+  String METRICS_SERVER_IDLE = "accumulo.server.idle";
+
   String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
   String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
   String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
-  String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
 
   String METRICS_FATE_PREFIX = "accumulo.fate.";
   String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java 
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index ac1bcaab90..af679c13d3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -19,22 +19,32 @@
 package org.apache.accumulo.server;
 
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.server.metrics.ProcessMetrics;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractServer implements AutoCloseable, Runnable {
+import io.micrometer.core.instrument.MeterRegistry;
+
+public abstract class AbstractServer implements AutoCloseable, 
MetricsProducer, Runnable {
 
   private final ServerContext context;
   protected final String applicationName;
   private final String hostname;
   private final Logger log;
+  private final ProcessMetrics processMetrics;
+  protected final long idleReportingPeriodNanos;
+  private volatile long idlePeriodStartNanos = 0L;
 
   protected AbstractServer(String appName, ServerOpts opts, String[] args) {
     this.log = LoggerFactory.getLogger(getClass().getName());
@@ -53,6 +63,30 @@ public abstract class AbstractServer implements 
AutoCloseable, Runnable {
       // Server-side "client" check to make sure we're logged in as a user we 
expect to be
       context.enforceKerberosLogin();
     }
+    processMetrics = new ProcessMetrics();
+    idleReportingPeriodNanos = TimeUnit.MILLISECONDS.toNanos(
+        
context.getConfiguration().getTimeInMillis(Property.GENERAL_IDLE_PROCESS_INTERVAL));
+  }
+
+  protected void idleProcessCheck(Supplier<Boolean> idleCondition) {
+    boolean isIdle = idleCondition.get();
+    boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodNanos == 0;
+    boolean isIdlePeriodNotStarted = idlePeriodStartNanos == 0;
+    boolean hasExceededIdlePeriod =
+        (System.nanoTime() - idlePeriodStartNanos) > idleReportingPeriodNanos;
+
+    if (shouldResetIdlePeriod) {
+      // Reset idle period and set idle metric to false
+      idlePeriodStartNanos = 0;
+      processMetrics.setIdleValue(false);
+    } else if (isIdlePeriodNotStarted) {
+      // Start tracking idle period
+      idlePeriodStartNanos = System.nanoTime();
+    } else if (hasExceededIdlePeriod) {
+      // Set idle metric to true and reset the start of the idle period
+      processMetrics.setIdleValue(true);
+      idlePeriodStartNanos = 0;
+    }
   }
 
   /**
@@ -76,6 +110,14 @@ public abstract class AbstractServer implements 
AutoCloseable, Runnable {
     }
   }
 
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    // makes mocking subclasses easier
+    if (processMetrics != null) {
+      processMetrics.registerMetrics(registry);
+    }
+  }
+
   public String getHostname() {
     return hostname;
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
new file mode 100644
index 0000000000..4ebbeb22a2
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class ProcessMetrics implements MetricsProducer {
+
+  private final AtomicInteger isIdle;
+
+  public ProcessMetrics() {
+    this.isIdle = new AtomicInteger(-1);
+  }
+
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get);
+  }
+
+  public void setIdleValue(boolean isIdle) {
+    this.isIdle.set(isIdle ? 1 : 0);
+  }
+}
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 63c525fb5a..3039ab9c8a 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Supplier;
@@ -122,7 +123,6 @@ import com.beust.jcommander.Parameter;
 import com.google.common.base.Preconditions;
 
 import io.micrometer.core.instrument.FunctionCounter;
-import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.LongTaskTimer;
 import io.micrometer.core.instrument.MeterRegistry;
 
@@ -188,6 +188,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
 
   @Override
   public void registerMetrics(MeterRegistry registry) {
+    super.registerMetrics(registry);
     FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, 
Compactor::getTotalEntriesRead)
         .description("Number of entries read by all compactions that have run 
on this compactor")
         .register(registry);
@@ -198,13 +199,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
         .description("Number and duration of stuck major 
compactions").register(registry);
     CompactionWatcher.setTimer(timer);
-
-    Gauge
-        .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning,
-            isRunning -> isRunning.get() ? 1 : 0)
-        .description(
-            "Indicates if the compactor is busy or not. The value will be 0 
when idle and 1 when busy.")
-        .register(registry);
   }
 
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
@@ -708,8 +702,17 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     try {
 
       final AtomicReference<Throwable> err = new AtomicReference<>();
+      final AtomicLong timeSinceLastCompletion = new AtomicLong(0L);
 
       while (!shutdown) {
+
+        idleProcessCheck(() -> {
+          return timeSinceLastCompletion.get() == 0
+              /* Never started a compaction */ || 
(timeSinceLastCompletion.get() > 0
+                  && (System.nanoTime() - timeSinceLastCompletion.get())
+                      > idleReportingPeriodNanos);
+        });
+
         currentCompactionId.set(null);
         err.set(null);
         JOB_HOLDER.reset();
@@ -858,6 +861,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
           }
         } finally {
           currentCompactionId.set(null);
+          timeSinceLastCompletion.set(System.nanoTime());
           // In the case where there is an error in the foreground code the 
background compaction
           // may still be running. Must cancel it before starting another 
iteration of the loop to
           // avoid multiple threads updating shared state.
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index cc3708f6f4..9e89e024a4 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -323,6 +323,7 @@ public class CompactorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+    PowerMock.suppress(PowerMock.methods(AbstractServer.class, 
"idleProcessCheck"));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
     HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -373,6 +374,7 @@ public class CompactorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+    PowerMock.suppress(PowerMock.methods(AbstractServer.class, 
"idleProcessCheck"));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
     HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -424,6 +426,7 @@ public class CompactorTest {
     PowerMock.resetAll();
     PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
     PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+    PowerMock.suppress(PowerMock.methods(AbstractServer.class, 
"idleProcessCheck"));
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
     HostAndPort address = HostAndPort.fromString("localhost:10240");
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index d3c61fdd0d..d8027fa968 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -411,7 +411,7 @@ public class ScanServer extends AbstractServer
     blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(),
         resourceManager.getDataCache(), resourceManager.getSummaryCache());
 
-    metricsInfo.addMetricsProducers(scanMetrics, scanServerMetrics, 
blockCacheMetrics);
+    metricsInfo.addMetricsProducers(this, scanMetrics, scanServerMetrics, 
blockCacheMetrics);
     metricsInfo.init();
     // We need to set the compaction manager so that we don't get an NPE in 
CompactableImpl.close
 
@@ -420,6 +420,8 @@ public class ScanServer extends AbstractServer
     try {
       while (!serverStopRequested) {
         UtilWaitThread.sleep(1000);
+        idleProcessCheck(() -> sessionManager.getActiveScans().isEmpty()
+            && tabletMetadataCache.estimatedSize() == 0);
       }
     } finally {
       LOG.info("Stopping Thrift Servers");
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 85b0872385..cb4e5f534d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -771,8 +771,8 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
     blockCacheMetrics = new 
BlockCacheMetrics(this.resourceManager.getIndexCache(),
         this.resourceManager.getDataCache(), 
this.resourceManager.getSummaryCache());
 
-    metricsInfo.addMetricsProducers(metrics, updateMetrics, scanMetrics, 
mincMetrics, ceMetrics,
-        blockCacheMetrics);
+    metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics, 
mincMetrics,
+        ceMetrics, blockCacheMetrics);
     metricsInfo.init();
 
     this.compactionManager = new CompactionManager(() -> Iterators
@@ -871,16 +871,20 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
 
     HostAndPort managerHost;
     while (!serverStopRequested) {
+
+      idleProcessCheck(() -> getOnlineTablets().isEmpty());
+
       // send all of the pending messages
       try {
         ManagerMessage mm = null;
         ManagerClientService.Client iface = null;
 
         try {
-          // wait until a message is ready to send, or a sever stop
+          // wait until a message is ready to send, or a server stop
           // was requested
           while (mm == null && !serverStopRequested) {
             mm = managerMessages.poll(1, TimeUnit.SECONDS);
+            idleProcessCheck(() -> getOnlineTablets().isEmpty());
           }
 
           // have a message to send to the manager, so grab a
@@ -908,6 +912,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
             // if any messages are immediately available grab em and
             // send them
             mm = managerMessages.poll();
+            idleProcessCheck(() -> getOnlineTablets().isEmpty());
           }
 
         } finally {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 89f887251b..535d50b34b 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.test.compaction;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.accumulo.core.util.UtilWaitThread.sleep;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
@@ -39,7 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.compactor.Compactor;
@@ -218,12 +216,10 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
 
     final AtomicLong totalEntriesRead = new AtomicLong(0);
     final AtomicLong totalEntriesWritten = new AtomicLong(0);
-    final AtomicInteger compactorBusy = new AtomicInteger(-1);
     final long expectedEntriesRead = 9216;
     final long expectedEntriesWritten = 4096;
 
-    Thread checkerThread =
-        getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, 
compactorBusy);
+    Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
 
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -241,13 +237,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
           EnumSet.of(IteratorUtil.IteratorScope.majc));
       log.info("Compacting table");
 
-      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
-          "Compactor busy metric should be false initially");
-
-      compact(client, table, 2, QUEUE1, false);
-
-      Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, 
CHECKER_THREAD_SLEEP_MS,
-          "Compactor busy metric should be true after starting compaction");
+      compact(client, table, 2, QUEUE1, true);
 
       Wait.waitFor(() -> {
         if (totalEntriesRead.get() == expectedEntriesRead
@@ -262,9 +252,6 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       }, 30_000, CHECKER_THREAD_SLEEP_MS,
           "Entries read and written metrics values did not match expected 
values");
 
-      Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
-          "Compactor busy metric should be false once compaction completes");
-
       log.info("Done Compacting table");
       verify(client, table, 2, ROWS);
     } finally {
@@ -280,10 +267,9 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
    *
    * @param totalEntriesRead this is set to the value of the entries read 
metric
    * @param totalEntriesWritten this is set to the value of the entries 
written metric
-   * @param compactorBusy this is set to the value of the compactor busy metric
    */
   private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
-      AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
+      AtomicLong totalEntriesWritten) {
     return Threads.createThread("metric-tailer", () -> {
       log.info("Starting metric tailer");
 
@@ -308,9 +294,6 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
             case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN:
               totalEntriesWritten.addAndGet(value);
               break;
-            case MetricsProducer.METRICS_COMPACTOR_BUSY:
-              compactorBusy.set(value);
-              break;
           }
         }
         sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
new file mode 100644
index 0000000000..6366d16ae9
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdleProcessMetricsIT extends SharedMiniClusterBase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(IdleProcessMetricsIT.class);
+
+  static final Duration idleProcessInterval = Duration.ofSeconds(10);
+
+  public static class IdleStopITConfig implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+      cfg.setNumCompactors(1);
+      cfg.setNumTservers(1);
+      cfg.setNumScanServers(1);
+
+      cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL,
+          idleProcessInterval.toSeconds() + "s");
+
+      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+
+    }
+
+  }
+
+  private static TestStatsDSink sink;
+
+  @BeforeAll
+  public static void before() throws Exception {
+    sink = new TestStatsDSink();
+    SharedMiniClusterBase.startMiniClusterWithConfig(new IdleStopITConfig());
+  }
+
+  @AfterAll
+  public static void after() throws Exception {
+    sink.close();
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testIdleStopMetrics() throws Exception {
+
+    
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+    getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
+    getCluster().getClusterControl().start(ServerType.SCAN_SERVER, 
"localhost");
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+    // should emit the idle metric after the configured duration of 
GENERAL_IDLE_PROCESS_INTERVAL
+    Thread.sleep(idleProcessInterval.toMillis());
+
+    AtomicBoolean sawCompactor = new AtomicBoolean(false);
+    AtomicBoolean sawSServer = new AtomicBoolean(false);
+    AtomicBoolean sawTServer = new AtomicBoolean(false);
+    Wait.waitFor(() -> {
+      List<String> statsDMetrics = sink.getLines();
+      statsDMetrics.stream().filter(line -> 
line.startsWith(MetricsProducer.METRICS_SERVER_IDLE))
+          .peek(log::info).map(TestStatsDSink::parseStatsDMetric).forEach(a -> 
{
+            String processName = a.getTags().get("process.name");
+            int value = Integer.parseInt(a.getValue());
+            assertTrue(value == 0 || value == 1 || value == -1, "Unexpected 
value " + value);
+            if ("tserver".equals(processName) && value == 0) {
+              // Expect tserver to never be idle
+              sawTServer.set(true);
+            } else if ("sserver".equals(processName) && value == 1) {
+              // Expect scan server to be idle
+              sawSServer.set(true);
+            } else if ("compactor".equals(processName) && value == 1) {
+              // Expect compactor to be idle
+              sawCompactor.set(true);
+            }
+
+          });
+      return sawCompactor.get() && sawSServer.get() && sawTServer.get();
+    });
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 5a46507db1..fc0ecdb881 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -102,13 +102,13 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
     // @formatter:off
     Set<String> unexpectedMetrics =
             Set.of(METRICS_COMPACTOR_MAJC_STUCK,
-                    METRICS_COMPACTOR_BUSY,
                     METRICS_REPLICATION_QUEUE,
                     METRICS_SCAN_YIELDS,
                     METRICS_UPDATE_ERRORS);
 
     // add sserver as flaky until scan server included in mini tests.
     Set<String> flakyMetrics = Set.of(METRICS_FATE_TYPE_IN_PROGRESS,
+            METRICS_SERVER_IDLE,
             METRICS_SCAN_BUSY_TIMEOUT_COUNTER,
             METRICS_SCAN_RESERVATION_CONFLICT_COUNTER,
             METRICS_SCAN_RESERVATION_TOTAL_TIMER,

Reply via email to