This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5ab0bd8273c0880800b04b1fa53224c85bfc4a99 Merge: 1e347f8482 ffbbca7de6 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Tue Jul 9 16:20:31 2024 -0400 Merge remote-tracking branch 'upstream/2.1' .../org/apache/accumulo/core/conf/Property.java | 3 + .../accumulo/core/metrics/MetricsProducer.java | 18 +-- .../spi/balancer/HostRegexTableLoadBalancer.java | 3 +- .../org/apache/accumulo/server/AbstractServer.java | 27 ++++- .../accumulo/server/metrics/ProcessMetrics.java | 8 ++ .../org/apache/accumulo/compactor/Compactor.java | 19 +-- .../org/apache/accumulo/tserver/ScanServer.java | 2 + .../org/apache/accumulo/tserver/TabletServer.java | 7 +- .../compaction/ExternalCompactionProgressIT.java | 22 +--- .../test/functional/IdleProcessMetricsIT.java | 130 +++++++++++++++++++++ .../apache/accumulo/test/metrics/MetricsIT.java | 2 +- 11 files changed, 201 insertions(+), 40 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index aa6533bb53,629585058a..ba65025f1d --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -291,29 -294,9 +291,32 @@@ public enum Property GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d", PropertyType.TIMEDURATION, "The length of time between generation of new secret keys.", "1.7.0"), + GENERAL_IDLE_PROCESS_INTERVAL("general.metrics.process.idle", "5m", PropertyType.TIMEDURATION, + "Amount of time a process must be idle before it is considered to be idle by the metrics system.", + "2.1.3"), + GENERAL_LOW_MEM_DETECTOR_INTERVAL("general.low.mem.detector.interval", "5s", + PropertyType.TIMEDURATION, "The time interval between low memory checks.", "3.0.0"), + GENERAL_LOW_MEM_DETECTOR_THRESHOLD("general.low.mem.detector.threshold", "0.05", + PropertyType.FRACTION, + "The LowMemoryDetector will report when free memory drops below this percentage of total memory.", + "3.0.0"), + GENERAL_LOW_MEM_SCAN_PROTECTION("general.low.mem.protection.scan", "false", PropertyType.BOOLEAN, + "Scans may be paused or return results early when the server " + + "is low on memory and this property is set to true. Enabling this property will incur a slight " + + "scan performance penalty when the server is not low on memory.", + "3.0.0"), + GENERAL_LOW_MEM_MINC_PROTECTION("general.low.mem.protection.compaction.minc", "false", + PropertyType.BOOLEAN, + "Minor compactions may be paused when the server " + + "is low on memory and this property is set to true. Enabling this property will incur a slight " + + "compaction performance penalty when the server is not low on memory.", + "3.0.0"), + GENERAL_LOW_MEM_MAJC_PROTECTION("general.low.mem.protection.compaction.majc", "false", + PropertyType.BOOLEAN, + "Major compactions may be paused when the server " + + "is low on memory and this property is set to true. Enabling this property will incur a slight " + + "compaction performance penalty when the server is not low on memory.", + "3.0.0"), GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s", PropertyType.TIMEDURATION, "The maximum amount of time that a Scanner should wait before retrying a failed RPC.", diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 0f8e607ee6,8cf2ffc956..ee6eb6e891 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -590,7 -604,8 +589,9 @@@ public interface MetricsProducer Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); + String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; + String METRICS_SERVER_IDLE = "accumulo.server.idle"; + String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index bd66689d0f,af679c13d3..d6cacdd19a --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -18,9 -18,10 +18,10 @@@ */ package org.apache.accumulo.server; -import java.util.Objects; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; + import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; @@@ -43,17 -41,19 +44,18 @@@ public abstract class AbstractServer im private final ServerContext context; protected final String applicationName; private final String hostname; - - private final Logger log; private final ProcessMetrics processMetrics; + protected final long idleReportingPeriodNanos; + private volatile long idlePeriodStartNanos = 0L; - protected AbstractServer(String appName, ServerOpts opts, String[] args) { - this.log = LoggerFactory.getLogger(getClass().getName()); + protected AbstractServer(String appName, ConfigOpts opts, String[] args) { this.applicationName = appName; opts.parseArgs(appName, args); - this.hostname = Objects.requireNonNull(opts.getAddress()); var siteConfig = opts.getSiteConfiguration(); + this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); SecurityUtil.serverLogin(siteConfig); context = new ServerContext(siteConfig); + Logger log = LoggerFactory.getLogger(getClass()); log.info("Version " + Constants.VERSION); log.info("Instance " + context.getInstanceID()); context.init(appName); @@@ -63,12 -63,30 +65,35 @@@ // Server-side "client" check to make sure we're logged in as a user we expect to be context.enforceKerberosLogin(); } - processMetrics = new ProcessMetrics(); + final LowMemoryDetector lmd = context.getLowMemoryDetector(); + ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay( + () -> lmd.logGCInfo(context.getConfiguration()), 0, + lmd.getIntervalMillis(context.getConfiguration()), TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + processMetrics = new ProcessMetrics(context); + idleReportingPeriodNanos = TimeUnit.MILLISECONDS.toNanos( + context.getConfiguration().getTimeInMillis(Property.GENERAL_IDLE_PROCESS_INTERVAL)); + } + + protected void idleProcessCheck(Supplier<Boolean> idleCondition) { + boolean isIdle = idleCondition.get(); + boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodNanos == 0; + boolean isIdlePeriodNotStarted = idlePeriodStartNanos == 0; + boolean hasExceededIdlePeriod = + (System.nanoTime() - idlePeriodStartNanos) > idleReportingPeriodNanos; + + if (shouldResetIdlePeriod) { + // Reset idle period and set idle metric to false + idlePeriodStartNanos = 0; + processMetrics.setIdleValue(false); + } else if (isIdlePeriodNotStarted) { + // Start tracking idle period + idlePeriodStartNanos = System.nanoTime(); + } else if (hasExceededIdlePeriod) { + // Set idle metric to true and reset the start of the idle period + processMetrics.setIdleValue(true); + idlePeriodStartNanos = 0; + } } /** diff --cc server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java index 8b44525c53,4ebbeb22a2..69de547e0b --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java @@@ -18,27 -18,26 +18,35 @@@ */ package org.apache.accumulo.server.metrics; +import java.util.List; + import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.server.ServerContext; import io.micrometer.core.instrument.MeterRegistry; public class ProcessMetrics implements MetricsProducer { + private final ServerContext context; + private final AtomicInteger isIdle; - public ProcessMetrics() { + public ProcessMetrics(final ServerContext context) { + this.context = context; + this.isIdle = new AtomicInteger(-1); } @Override public void registerMetrics(MeterRegistry registry) { + registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected); + registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get); } + private int lowMemDetected(ProcessMetrics processMetrics) { + return context.getLowMemoryDetector().isRunningLowOnMemory() ? 1 : 0; + } ++ + public void setIdleValue(boolean isIdle) { + this.isIdle.set(isIdle ? 1 : 0); + } } diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 57f9f87978,3039ab9c8a..5fabe8df5a --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -121,11 -119,10 +122,10 @@@ import org.apache.zookeeper.KeeperExcep import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import io.micrometer.core.instrument.FunctionCounter; - import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@@ -190,15 -199,15 +190,8 @@@ public class Compactor extends Abstract LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); - - Gauge - .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning, - isRunning -> isRunning.get() ? 1 : 0) - .description( - "Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when busy.") - .register(registry); } - protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - } - protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, long timeBetweenChecks) { ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( diff --cc test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 2a26f6eb16,fc0ecdb881..b12645d815 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@@ -102,8 -102,9 +102,7 @@@ public class MetricsIT extends Configur // @formatter:off Set<String> unexpectedMetrics = Set.of(METRICS_COMPACTOR_MAJC_STUCK, - METRICS_COMPACTOR_BUSY, - METRICS_REPLICATION_QUEUE, - METRICS_SCAN_YIELDS, - METRICS_UPDATE_ERRORS); + METRICS_SCAN_YIELDS); // add sserver as flaky until scan server included in mini tests. Set<String> flakyMetrics = Set.of(METRICS_FATE_TYPE_IN_PROGRESS,