This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f33ee9dc3c669acdd4e5b3a3b7dcd6cc682a8f12
Merge: d0b72f5465 5ab0bd8273
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Tue Jul 9 16:58:25 2024 -0400

    Merge remote-tracking branch 'upstream/main' into elasticity

 .../org/apache/accumulo/core/conf/Property.java    |  4 +-
 .../accumulo/core/metrics/MetricsProducer.java     | 16 +++----
 .../spi/balancer/HostRegexTableLoadBalancer.java   |  3 +-
 .../org/apache/accumulo/server/AbstractServer.java | 22 ++++++----
 .../accumulo/server/metrics/ProcessMetrics.java    | 18 ++++----
 .../org/apache/accumulo/compactor/Compactor.java   |  8 ----
 .../org/apache/accumulo/tserver/ScanServer.java    |  6 +--
 .../compaction/ExternalCompactionProgressIT.java   | 29 ++-----------
 .../test/functional/IdleProcessMetricsIT.java      | 50 ++++++++++++++--------
 .../apache/accumulo/test/metrics/MetricsIT.java    |  3 +-
 10 files changed, 71 insertions(+), 88 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 933c651297,ee6eb6e891..08d8bc6514
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -652,20 -594,8 +651,19 @@@ public interface MetricsProducer 
  
    String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
    String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
 +  String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue.";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = 
METRICS_COMPACTOR_QUEUE_PREFIX + "count";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_QUEUE_PREFIX + "length";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority";
    String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
    String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
-   String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
  
    String METRICS_FATE_PREFIX = "accumulo.fate.";
    String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
index 17e8e42197,69de547e0b..e110e4d5a8
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@@ -18,6 -18,9 +18,8 @@@
   */
  package org.apache.accumulo.server.metrics;
  
 -import java.util.List;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
  import org.apache.accumulo.core.metrics.MetricsProducer;
  import org.apache.accumulo.server.ServerContext;
  
@@@ -35,14 -38,8 +37,8 @@@ public class ProcessMetrics implements 
  
    @Override
    public void registerMetrics(MeterRegistry registry) {
 -    registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected);
 +    registry.gauge(METRICS_LOW_MEMORY, this, this::lowMemDetected);
-     idleCounter = registry.counter(METRICS_SERVER_IDLE);
-   }
- 
-   public void incrementIdleCounter() {
-     if (idleCounter != null) {
-       idleCounter.increment();
-     }
+     registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get);
    }
  
    private int lowMemDetected(ProcessMetrics processMetrics) {
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 72c0fbe560,fb8e2d4ccc..f76b816d73
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@@ -35,9 -36,6 +35,8 @@@ import java.util.EnumSet
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
- import java.util.Objects;
 +import java.util.Optional;
 +import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicLong;
@@@ -146,12 -216,10 +145,11 @@@ public class ExternalCompactionProgress
  
      final AtomicLong totalEntriesRead = new AtomicLong(0);
      final AtomicLong totalEntriesWritten = new AtomicLong(0);
 -    final long expectedEntriesRead = 9216;
 -    final long expectedEntriesWritten = 4096;
 +    final ConcurrentHashMap<String,Long> compactorBusy = new 
ConcurrentHashMap<>();
 +    final long expectedEntriesRead = 18432;
 +    final long expectedEntriesWritten = 13312;
  
-     Thread checkerThread =
-         getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, 
compactorBusy);
+     Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
  
      try (AccumuloClient client =
          
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
@@@ -166,14 -237,7 +164,7 @@@
            EnumSet.of(IteratorUtil.IteratorScope.majc));
        log.info("Compacting table");
  
-       Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000,
-           CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false 
initially");
- 
-       compact(client, table, 2, GROUP1, false);
- 
-       Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 1, 30_000,
-           CHECKER_THREAD_SLEEP_MS,
-           "Compactor busy metric should be true after starting compaction");
 -      compact(client, table, 2, QUEUE1, true);
++      compact(client, table, 2, GROUP1, true);
  
        Wait.waitFor(() -> {
          if (totalEntriesRead.get() == expectedEntriesRead
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
index 36d16b8d2d,6366d16ae9..e802be7ff4
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
@@@ -18,7 -18,8 +18,8 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
  
  import java.time.Duration;
  import java.util.List;
@@@ -30,9 -32,12 +31,10 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.metrics.MetricsProducer;
  import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
  import org.apache.accumulo.harness.SharedMiniClusterBase;
 -import org.apache.accumulo.minicluster.ServerType;
  import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 -import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
  import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
  import org.apache.accumulo.test.metrics.TestStatsDSink;
+ import org.apache.accumulo.test.util.Wait;
  import org.apache.hadoop.conf.Configuration;
  import org.junit.jupiter.api.AfterAll;
  import org.junit.jupiter.api.BeforeAll;
@@@ -40,30 -47,21 +44,37 @@@ import org.slf4j.LoggerFactory
  
  public class IdleProcessMetricsIT extends SharedMiniClusterBase {
  
+   private static final Logger log = 
LoggerFactory.getLogger(IdleProcessMetricsIT.class);
+ 
+   static final Duration idleProcessInterval = Duration.ofSeconds(10);
+ 
++  public static final String IDLE_RESOURCE_GROUP = "IDLE_PROCESS_TEST";
++
    public static class IdleStopITConfig implements 
MiniClusterConfigurationCallback {
  
      @Override
      public void configureMiniCluster(MiniAccumuloConfigImpl cfg, 
Configuration coreSite) {
 -      ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
 -      cfg.setNumCompactors(1);
 -      cfg.setNumTservers(1);
 -      cfg.setNumScanServers(1);
 +
 +      // Verify expectations about the default config. Want to ensure there 
no other resource groups
 +      // configured.
 +      assertEquals(Map.of(Constants.DEFAULT_COMPACTION_SERVICE_NAME, 1),
 +          cfg.getClusterServerConfiguration().getCompactorConfiguration());
 +
 +      // Disable the default scan servers and compactors, just start 1
 +      // tablet server in the default group to host the root and metadata
 +      // tables
 +      cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
 +      cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
 +      cfg.getClusterServerConfiguration().setNumDefaultCompactors(0);
 +
 +      // Add servers in a resource group that will not get any work. These
 +      // are the servers that should stop because they are idle.
-       
cfg.getClusterServerConfiguration().addTabletServerResourceGroup("IDLE_PROCESS_TEST",
 1);
-       
cfg.getClusterServerConfiguration().addScanServerResourceGroup("IDLE_PROCESS_TEST",
 1);
-       
cfg.getClusterServerConfiguration().addCompactorResourceGroup("IDLE_PROCESS_TEST",
 1);
++      
cfg.getClusterServerConfiguration().addTabletServerResourceGroup(IDLE_RESOURCE_GROUP,
 1);
++      
cfg.getClusterServerConfiguration().addScanServerResourceGroup(IDLE_RESOURCE_GROUP,
 1);
++      
cfg.getClusterServerConfiguration().addCompactorResourceGroup(IDLE_RESOURCE_GROUP,
 1);
  
-       cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL, "10s");
+       cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL,
+           idleProcessInterval.toSeconds() + "s");
  
        // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
        // to push all metrics to the sink we started.
@@@ -100,32 -93,38 +111,33 @@@
    @Test
    public void testIdleStopMetrics() throws Exception {
  
-     // The server processes in the IDLE_PROCESS_TEST resource group
-     // should emit the idle metric after 10s of being idle based
-     // on the configuration for this test. Wait 20s before checking
-     // for it.
-     Thread.sleep(20_000);
 -    
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
 -    getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
 -    getCluster().getClusterControl().start(ServerType.SCAN_SERVER, 
"localhost");
 -    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
--
-     List<String> statsDMetrics;
+     // should emit the idle metric after the configured duration of 
GENERAL_IDLE_PROCESS_INTERVAL
+     Thread.sleep(idleProcessInterval.toMillis());
  
      AtomicBoolean sawCompactor = new AtomicBoolean(false);
      AtomicBoolean sawSServer = new AtomicBoolean(false);
      AtomicBoolean sawTServer = new AtomicBoolean(false);
-     // loop until we run out of lines or until we see all expected metrics
-     while (!(statsDMetrics = sink.getLines()).isEmpty() && 
!sawCompactor.get() && !sawSServer.get()
-         && !sawTServer.get()) {
++
+     Wait.waitFor(() -> {
+       List<String> statsDMetrics = sink.getLines();
        statsDMetrics.stream().filter(line -> 
line.startsWith(MetricsProducer.METRICS_SERVER_IDLE))
-           .map(TestStatsDSink::parseStatsDMetric).forEach(a -> {
 -          .peek(log::info).map(TestStatsDSink::parseStatsDMetric).forEach(a 
-> {
++          .peek(log::info).map(TestStatsDSink::parseStatsDMetric)
++          .filter(a -> 
a.getTags().get("resource.group").equals(IDLE_RESOURCE_GROUP)).forEach(a -> {
              String processName = a.getTags().get("process.name");
-             if (processName.equals("tserver")) {
+             int value = Integer.parseInt(a.getValue());
+             assertTrue(value == 0 || value == 1 || value == -1, "Unexpected 
value " + value);
 -            if ("tserver".equals(processName) && value == 0) {
 -              // Expect tserver to never be idle
++            // check that the idle metric was emitted for each
++            if ("tserver".equals(processName) && value == 1) {
                sawTServer.set(true);
-             } else if (processName.equals("sserver")) {
+             } else if ("sserver".equals(processName) && value == 1) {
 -              // Expect scan server to be idle
                sawSServer.set(true);
-             } else if (processName.equals("compactor")) {
+             } else if ("compactor".equals(processName) && value == 1) {
 -              // Expect compactor to be idle
                sawCompactor.set(true);
              }
+ 
            });
-     }
+       return sawCompactor.get() && sawSServer.get() && sawTServer.get();
+     });
    }
  
  }

Reply via email to