This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5ab0bd8273c0880800b04b1fa53224c85bfc4a99
Merge: 1e347f8482 ffbbca7de6
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Tue Jul 9 16:20:31 2024 -0400

    Merge remote-tracking branch 'upstream/2.1'

 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../accumulo/core/metrics/MetricsProducer.java     |  18 +--
 .../spi/balancer/HostRegexTableLoadBalancer.java   |   3 +-
 .../org/apache/accumulo/server/AbstractServer.java |  27 ++++-
 .../accumulo/server/metrics/ProcessMetrics.java    |   8 ++
 .../org/apache/accumulo/compactor/Compactor.java   |  19 +--
 .../org/apache/accumulo/tserver/ScanServer.java    |   2 +
 .../org/apache/accumulo/tserver/TabletServer.java  |   7 +-
 .../compaction/ExternalCompactionProgressIT.java   |  22 +---
 .../test/functional/IdleProcessMetricsIT.java      | 130 +++++++++++++++++++++
 .../apache/accumulo/test/metrics/MetricsIT.java    |   2 +-
 11 files changed, 201 insertions(+), 40 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index aa6533bb53,629585058a..ba65025f1d
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -291,29 -294,9 +291,32 @@@ public enum Property 
    
GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval",
 "1d",
        PropertyType.TIMEDURATION, "The length of time between generation of 
new secret keys.",
        "1.7.0"),
+   GENERAL_IDLE_PROCESS_INTERVAL("general.metrics.process.idle", "5m", 
PropertyType.TIMEDURATION,
+       "Amount of time a process must be idle before it is considered to be 
idle by the metrics system.",
+       "2.1.3"),
 +  GENERAL_LOW_MEM_DETECTOR_INTERVAL("general.low.mem.detector.interval", "5s",
 +      PropertyType.TIMEDURATION, "The time interval between low memory 
checks.", "3.0.0"),
 +  GENERAL_LOW_MEM_DETECTOR_THRESHOLD("general.low.mem.detector.threshold", 
"0.05",
 +      PropertyType.FRACTION,
 +      "The LowMemoryDetector will report when free memory drops below this 
percentage of total memory.",
 +      "3.0.0"),
 +  GENERAL_LOW_MEM_SCAN_PROTECTION("general.low.mem.protection.scan", "false", 
PropertyType.BOOLEAN,
 +      "Scans may be paused or return results early when the server "
 +          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
 +          + "scan performance penalty when the server is not low on memory.",
 +      "3.0.0"),
 +  
GENERAL_LOW_MEM_MINC_PROTECTION("general.low.mem.protection.compaction.minc", 
"false",
 +      PropertyType.BOOLEAN,
 +      "Minor compactions may be paused when the server "
 +          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
 +          + "compaction performance penalty when the server is not low on 
memory.",
 +      "3.0.0"),
 +  
GENERAL_LOW_MEM_MAJC_PROTECTION("general.low.mem.protection.compaction.majc", 
"false",
 +      PropertyType.BOOLEAN,
 +      "Major compactions may be paused when the server "
 +          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
 +          + "compaction performance penalty when the server is not low on 
memory.",
 +      "3.0.0"),
    GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s",
        PropertyType.TIMEDURATION,
        "The maximum amount of time that a Scanner should wait before retrying 
a failed RPC.",
diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 0f8e607ee6,8cf2ffc956..ee6eb6e891
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -590,7 -604,8 +589,9 @@@ public interface MetricsProducer 
  
    Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
  
 +  String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
+   String METRICS_SERVER_IDLE = "accumulo.server.idle";
+ 
    String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
    String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
    String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index bd66689d0f,af679c13d3..d6cacdd19a
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@@ -18,9 -18,10 +18,10 @@@
   */
  package org.apache.accumulo.server;
  
 -import java.util.Objects;
 +import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicReference;
+ import java.util.function.Supplier;
  
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.classloader.ClassLoaderUtil;
@@@ -43,17 -41,19 +44,18 @@@ public abstract class AbstractServer im
    private final ServerContext context;
    protected final String applicationName;
    private final String hostname;
- 
 -  private final Logger log;
    private final ProcessMetrics processMetrics;
+   protected final long idleReportingPeriodNanos;
+   private volatile long idlePeriodStartNanos = 0L;
  
 -  protected AbstractServer(String appName, ServerOpts opts, String[] args) {
 -    this.log = LoggerFactory.getLogger(getClass().getName());
 +  protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
      this.applicationName = appName;
      opts.parseArgs(appName, args);
 -    this.hostname = Objects.requireNonNull(opts.getAddress());
      var siteConfig = opts.getSiteConfiguration();
 +    this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS);
      SecurityUtil.serverLogin(siteConfig);
      context = new ServerContext(siteConfig);
 +    Logger log = LoggerFactory.getLogger(getClass());
      log.info("Version " + Constants.VERSION);
      log.info("Instance " + context.getInstanceID());
      context.init(appName);
@@@ -63,12 -63,30 +65,35 @@@
        // Server-side "client" check to make sure we're logged in as a user we 
expect to be
        context.enforceKerberosLogin();
      }
 -    processMetrics = new ProcessMetrics();
 +    final LowMemoryDetector lmd = context.getLowMemoryDetector();
 +    ScheduledFuture<?> future = 
context.getScheduledExecutor().scheduleWithFixedDelay(
 +        () -> lmd.logGCInfo(context.getConfiguration()), 0,
 +        lmd.getIntervalMillis(context.getConfiguration()), 
TimeUnit.MILLISECONDS);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +    processMetrics = new ProcessMetrics(context);
+     idleReportingPeriodNanos = TimeUnit.MILLISECONDS.toNanos(
+         
context.getConfiguration().getTimeInMillis(Property.GENERAL_IDLE_PROCESS_INTERVAL));
+   }
+ 
+   protected void idleProcessCheck(Supplier<Boolean> idleCondition) {
+     boolean isIdle = idleCondition.get();
+     boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodNanos == 0;
+     boolean isIdlePeriodNotStarted = idlePeriodStartNanos == 0;
+     boolean hasExceededIdlePeriod =
+         (System.nanoTime() - idlePeriodStartNanos) > idleReportingPeriodNanos;
+ 
+     if (shouldResetIdlePeriod) {
+       // Reset idle period and set idle metric to false
+       idlePeriodStartNanos = 0;
+       processMetrics.setIdleValue(false);
+     } else if (isIdlePeriodNotStarted) {
+       // Start tracking idle period
+       idlePeriodStartNanos = System.nanoTime();
+     } else if (hasExceededIdlePeriod) {
+       // Set idle metric to true and reset the start of the idle period
+       processMetrics.setIdleValue(true);
+       idlePeriodStartNanos = 0;
+     }
    }
  
    /**
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
index 8b44525c53,4ebbeb22a2..69de547e0b
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@@ -18,27 -18,26 +18,35 @@@
   */
  package org.apache.accumulo.server.metrics;
  
 +import java.util.List;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.server.ServerContext;
  
  import io.micrometer.core.instrument.MeterRegistry;
  
  public class ProcessMetrics implements MetricsProducer {
  
 +  private final ServerContext context;
+   private final AtomicInteger isIdle;
  
 -  public ProcessMetrics() {
 +  public ProcessMetrics(final ServerContext context) {
 +    this.context = context;
+     this.isIdle = new AtomicInteger(-1);
    }
  
    @Override
    public void registerMetrics(MeterRegistry registry) {
 +    registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected);
+     registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get);
    }
  
 +  private int lowMemDetected(ProcessMetrics processMetrics) {
 +    return context.getLowMemoryDetector().isRunningLowOnMemory() ? 1 : 0;
 +  }
++
+   public void setIdleValue(boolean isIdle) {
+     this.isIdle.set(isIdle ? 1 : 0);
+   }
  }
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 57f9f87978,3039ab9c8a..5fabe8df5a
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -121,11 -119,10 +122,10 @@@ import org.apache.zookeeper.KeeperExcep
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import com.beust.jcommander.Parameter;
  import com.google.common.base.Preconditions;
 +import com.google.common.net.HostAndPort;
  
  import io.micrometer.core.instrument.FunctionCounter;
- import io.micrometer.core.instrument.Gauge;
  import io.micrometer.core.instrument.LongTaskTimer;
  import io.micrometer.core.instrument.MeterRegistry;
  
@@@ -190,15 -199,15 +190,8 @@@ public class Compactor extends Abstract
      LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
          .description("Number and duration of stuck major 
compactions").register(registry);
      CompactionWatcher.setTimer(timer);
- 
-     Gauge
-         .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning,
-             isRunning -> isRunning.get() ? 1 : 0)
-         .description(
-             "Indicates if the compactor is busy or not. The value will be 0 
when idle and 1 when busy.")
-         .register(registry);
    }
  
 -  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
 -    ScheduledFuture<?> future =
 -        schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
 -            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
 -    ThreadPools.watchNonCriticalScheduledTask(future);
 -  }
 -
    protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
        long timeBetweenChecks) {
      
ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
diff --cc test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 2a26f6eb16,fc0ecdb881..b12645d815
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@@ -102,8 -102,9 +102,7 @@@ public class MetricsIT extends Configur
      // @formatter:off
      Set<String> unexpectedMetrics =
              Set.of(METRICS_COMPACTOR_MAJC_STUCK,
-                     METRICS_COMPACTOR_BUSY,
 -                    METRICS_REPLICATION_QUEUE,
 -                    METRICS_SCAN_YIELDS,
 -                    METRICS_UPDATE_ERRORS);
 +                    METRICS_SCAN_YIELDS);
  
      // add sserver as flaky until scan server included in mini tests.
      Set<String> flakyMetrics = Set.of(METRICS_FATE_TYPE_IN_PROGRESS,

Reply via email to