This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit cbb9efdece6a03855d270ac5fc1e06a16b12e407 Merge: 552adf2e1e 3c5bb40ae9 Author: Daniel Roberts <[email protected]> AuthorDate: Wed Jul 24 14:00:24 2024 +0000 Merge branch 'main' into elasticity .../org/apache/accumulo/server/AbstractServer.java | 11 +- .../org/apache/accumulo/compactor/Compactor.java | 16 +-- .../org/apache/accumulo/tserver/ScanServer.java | 4 +- .../org/apache/accumulo/tserver/TabletServer.java | 6 +- .../test/functional/IdleProcessMetricsIT.java | 113 +++++++++++++++++++++ 5 files changed, 135 insertions(+), 15 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index ae9a6e8ec6,dc46e0ecd0..be9456f152 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -21,8 -21,6 +21,7 @@@ package org.apache.accumulo.server import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; - import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; diff --cc test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java index e802be7ff4,c57cfc8480..afee144e95 --- a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java @@@ -18,7 -18,11 +18,11 @@@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; + import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; @@@ -26,12 -31,22 +31,20 @@@ import java.util.List import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.metrics.MetricsProducer; + import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.util.Wait; @@@ -140,4 -147,99 +161,96 @@@ public class IdleProcessMetricsIT exten }); } + /** + * Test that before during and after a compaction, the compactor will emit the appropriate value + * for the idle metric. + */ + @Test + public void idleCompactorTest() throws Exception { + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + - getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - + // should emit the idle metric after the configured duration of GENERAL_IDLE_PROCESS_INTERVAL + Thread.sleep(idleProcessInterval.toMillis()); + + final String processName = "compactor"; + + log.info("Waiting for compactor to go idle"); + waitForIdleMetricValueToBe(1, processName); + + String table1 = getUniqueNames(1)[0]; + createTable(client, table1, "cs1"); + writeData(client, table1); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 5); + client.tableOperations().attachIterator(table1, setting, + EnumSet.of(IteratorUtil.IteratorScope.majc)); + - compact(client, table1, 2, QUEUE1, false); ++ compact(client, table1, 2, IDLE_RESOURCE_GROUP, false); + + log.info("Waiting for compactor to be not idle after starting compaction"); + waitForIdleMetricValueToBe(0, processName); + + log.info("Waiting for compactor to go idle once compaction completes"); + waitForIdleMetricValueToBe(1, processName); + + verify(client, table1, 2); + } + + } + + /** + * Test that before during and after a scan, the scan server will emit the appropriate value for + * the idle metric. + */ + @Test + public void idleScanServerTest() throws Exception { + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); + + // should emit the idle metric after the configured duration of GENERAL_IDLE_PROCESS_INTERVAL + Thread.sleep(idleProcessInterval.toMillis()); + + final String processName = "sserver"; + + log.info("Waiting for sserver to go idle"); + waitForIdleMetricValueToBe(1, processName); + + String table1 = getUniqueNames(1)[0]; + createTable(client, table1, "cs1"); + writeData(client, table1); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 5); + client.tableOperations().attachIterator(table1, setting, + EnumSet.of(IteratorUtil.IteratorScope.scan)); + + try (Scanner scanner = client.createScanner(table1, Authorizations.EMPTY)) { + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + var ignored = scanner.stream().count(); + } + + log.info("Waiting for sserver to be not idle after starting a scan"); + waitForIdleMetricValueToBe(0, processName); + + log.info("Waiting for sserver to go idle once scan completes completes"); + waitForIdleMetricValueToBe(1, processName); + } + + } + + private static void waitForIdleMetricValueToBe(int expectedValue, String processName) { + Wait.waitFor( + () -> sink.getLines().stream() + .filter(line -> line.startsWith(MetricsProducer.METRICS_SERVER_IDLE)) + .map(TestStatsDSink::parseStatsDMetric) + .filter(a -> a.getTags().get("process.name").equals(processName)) + .peek(a -> log.info("Idle metric: {}", a)) + .anyMatch(a -> Integer.parseInt(a.getValue()) == expectedValue), + 60_000, 2000, "Idle metric did not reach the expected value " + expectedValue); + } + }
