This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit cbb9efdece6a03855d270ac5fc1e06a16b12e407
Merge: 552adf2e1e 3c5bb40ae9
Author: Daniel Roberts <[email protected]>
AuthorDate: Wed Jul 24 14:00:24 2024 +0000

    Merge branch 'main' into elasticity

 .../org/apache/accumulo/server/AbstractServer.java |  11 +-
 .../org/apache/accumulo/compactor/Compactor.java   |  16 +--
 .../org/apache/accumulo/tserver/ScanServer.java    |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |   6 +-
 .../test/functional/IdleProcessMetricsIT.java      | 113 +++++++++++++++++++++
 5 files changed, 135 insertions(+), 15 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index ae9a6e8ec6,dc46e0ecd0..be9456f152
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@@ -21,8 -21,6 +21,7 @@@ package org.apache.accumulo.server
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Function;
- import java.util.function.Supplier;
  
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.classloader.ClassLoaderUtil;
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
index e802be7ff4,c57cfc8480..afee144e95
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
@@@ -18,7 -18,11 +18,11 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
+ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertTrue;
  
  import java.time.Duration;
@@@ -26,12 -31,22 +31,20 @@@ import java.util.List
  import java.util.Map;
  import java.util.concurrent.atomic.AtomicBoolean;
  
 -import org.apache.accumulo.compactor.Compactor;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
 +import org.apache.accumulo.core.Constants;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.ScannerBase;
  import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.iterators.IteratorUtil;
  import org.apache.accumulo.core.metrics.MetricsProducer;
+ import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
  import org.apache.accumulo.harness.SharedMiniClusterBase;
+ import org.apache.accumulo.minicluster.ServerType;
  import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 -import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
  import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
  import org.apache.accumulo.test.metrics.TestStatsDSink;
  import org.apache.accumulo.test.util.Wait;
@@@ -140,4 -147,99 +161,96 @@@ public class IdleProcessMetricsIT exten
      });
    }
  
+   /**
+    * Test that before during and after a compaction, the compactor will emit 
the appropriate value
+    * for the idle metric.
+    */
+   @Test
+   public void idleCompactorTest() throws Exception {
+     try (AccumuloClient client =
+         
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
+ 
 -      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
 -      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
 -
+       // should emit the idle metric after the configured duration of 
GENERAL_IDLE_PROCESS_INTERVAL
+       Thread.sleep(idleProcessInterval.toMillis());
+ 
+       final String processName = "compactor";
+ 
+       log.info("Waiting for compactor to go idle");
+       waitForIdleMetricValueToBe(1, processName);
+ 
+       String table1 = getUniqueNames(1)[0];
+       createTable(client, table1, "cs1");
+       writeData(client, table1);
+ 
+       IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+       SlowIterator.setSleepTime(setting, 5);
+       client.tableOperations().attachIterator(table1, setting,
+           EnumSet.of(IteratorUtil.IteratorScope.majc));
+ 
 -      compact(client, table1, 2, QUEUE1, false);
++      compact(client, table1, 2, IDLE_RESOURCE_GROUP, false);
+ 
+       log.info("Waiting for compactor to be not idle after starting 
compaction");
+       waitForIdleMetricValueToBe(0, processName);
+ 
+       log.info("Waiting for compactor to go idle once compaction completes");
+       waitForIdleMetricValueToBe(1, processName);
+ 
+       verify(client, table1, 2);
+     }
+ 
+   }
+ 
+   /**
+    * Test that before during and after a scan, the scan server will emit the 
appropriate value for
+    * the idle metric.
+    */
+   @Test
+   public void idleScanServerTest() throws Exception {
+     try (AccumuloClient client =
+         
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
+ 
+       getCluster().getClusterControl().start(ServerType.SCAN_SERVER, 
"localhost");
+ 
+       // should emit the idle metric after the configured duration of 
GENERAL_IDLE_PROCESS_INTERVAL
+       Thread.sleep(idleProcessInterval.toMillis());
+ 
+       final String processName = "sserver";
+ 
+       log.info("Waiting for sserver to go idle");
+       waitForIdleMetricValueToBe(1, processName);
+ 
+       String table1 = getUniqueNames(1)[0];
+       createTable(client, table1, "cs1");
+       writeData(client, table1);
+ 
+       IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+       SlowIterator.setSleepTime(setting, 5);
+       client.tableOperations().attachIterator(table1, setting,
+           EnumSet.of(IteratorUtil.IteratorScope.scan));
+ 
+       try (Scanner scanner = client.createScanner(table1, 
Authorizations.EMPTY)) {
+         scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
+         var ignored = scanner.stream().count();
+       }
+ 
+       log.info("Waiting for sserver to be not idle after starting a scan");
+       waitForIdleMetricValueToBe(0, processName);
+ 
+       log.info("Waiting for sserver to go idle once scan completes 
completes");
+       waitForIdleMetricValueToBe(1, processName);
+     }
+ 
+   }
+ 
+   private static void waitForIdleMetricValueToBe(int expectedValue, String 
processName) {
+     Wait.waitFor(
+         () -> sink.getLines().stream()
+             .filter(line -> 
line.startsWith(MetricsProducer.METRICS_SERVER_IDLE))
+             .map(TestStatsDSink::parseStatsDMetric)
+             .filter(a -> a.getTags().get("process.name").equals(processName))
+             .peek(a -> log.info("Idle metric: {}", a))
+             .anyMatch(a -> Integer.parseInt(a.getValue()) == expectedValue),
+         60_000, 2000, "Idle metric did not reach the expected value " + 
expectedValue);
+   }
+ 
  }

Reply via email to