This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f33ee9dc3c669acdd4e5b3a3b7dcd6cc682a8f12 Merge: d0b72f5465 5ab0bd8273 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Tue Jul 9 16:58:25 2024 -0400 Merge remote-tracking branch 'upstream/main' into elasticity .../org/apache/accumulo/core/conf/Property.java | 4 +- .../accumulo/core/metrics/MetricsProducer.java | 16 +++---- .../spi/balancer/HostRegexTableLoadBalancer.java | 3 +- .../org/apache/accumulo/server/AbstractServer.java | 22 ++++++---- .../accumulo/server/metrics/ProcessMetrics.java | 18 ++++---- .../org/apache/accumulo/compactor/Compactor.java | 8 ---- .../org/apache/accumulo/tserver/ScanServer.java | 6 +-- .../compaction/ExternalCompactionProgressIT.java | 29 ++----------- .../test/functional/IdleProcessMetricsIT.java | 50 ++++++++++++++-------- .../apache/accumulo/test/metrics/MetricsIT.java | 3 +- 10 files changed, 71 insertions(+), 88 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 933c651297,ee6eb6e891..08d8bc6514 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -652,20 -594,8 +651,19 @@@ public interface MetricsProducer String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue."; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_QUEUE_PREFIX + "count"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_QUEUE_PREFIX + "length"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; - String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --cc server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java index 17e8e42197,69de547e0b..e110e4d5a8 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java @@@ -18,6 -18,9 +18,8 @@@ */ package org.apache.accumulo.server.metrics; -import java.util.List; + import java.util.concurrent.atomic.AtomicInteger; + import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.ServerContext; @@@ -35,14 -38,8 +37,8 @@@ public class ProcessMetrics implements @Override public void registerMetrics(MeterRegistry registry) { - registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected); + registry.gauge(METRICS_LOW_MEMORY, this, this::lowMemDetected); - idleCounter = registry.counter(METRICS_SERVER_IDLE); - } - - public void incrementIdleCounter() { - if (idleCounter != null) { - idleCounter.increment(); - } + registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get); } private int lowMemDetected(ProcessMetrics processMetrics) { diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 72c0fbe560,fb8e2d4ccc..f76b816d73 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -35,9 -36,6 +35,8 @@@ import java.util.EnumSet import java.util.HashMap; import java.util.List; import java.util.Map; - import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@@ -146,12 -216,10 +145,11 @@@ public class ExternalCompactionProgress final AtomicLong totalEntriesRead = new AtomicLong(0); final AtomicLong totalEntriesWritten = new AtomicLong(0); - final long expectedEntriesRead = 9216; - final long expectedEntriesWritten = 4096; + final ConcurrentHashMap<String,Long> compactorBusy = new ConcurrentHashMap<>(); + final long expectedEntriesRead = 18432; + final long expectedEntriesWritten = 13312; - Thread checkerThread = - getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, compactorBusy); + Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@@ -166,14 -237,7 +164,7 @@@ EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000, - CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false initially"); - - compact(client, table, 2, GROUP1, false); - - Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 1, 30_000, - CHECKER_THREAD_SLEEP_MS, - "Compactor busy metric should be true after starting compaction"); - compact(client, table, 2, QUEUE1, true); ++ compact(client, table, 2, GROUP1, true); Wait.waitFor(() -> { if (totalEntriesRead.get() == expectedEntriesRead diff --cc test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java index 36d16b8d2d,6366d16ae9..e802be7ff4 --- a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java @@@ -18,7 -18,8 +18,8 @@@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; +import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.List; @@@ -30,9 -32,12 +31,10 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; + import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@@ -40,30 -47,21 +44,37 @@@ import org.slf4j.LoggerFactory public class IdleProcessMetricsIT extends SharedMiniClusterBase { + private static final Logger log = LoggerFactory.getLogger(IdleProcessMetricsIT.class); + + static final Duration idleProcessInterval = Duration.ofSeconds(10); + ++ public static final String IDLE_RESOURCE_GROUP = "IDLE_PROCESS_TEST"; ++ public static class IdleStopITConfig implements MiniClusterConfigurationCallback { @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); - cfg.setNumCompactors(1); - cfg.setNumTservers(1); - cfg.setNumScanServers(1); + + // Verify expectations about the default config. Want to ensure there no other resource groups + // configured. + assertEquals(Map.of(Constants.DEFAULT_COMPACTION_SERVICE_NAME, 1), + cfg.getClusterServerConfiguration().getCompactorConfiguration()); + + // Disable the default scan servers and compactors, just start 1 + // tablet server in the default group to host the root and metadata + // tables + cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + cfg.getClusterServerConfiguration().setNumDefaultScanServers(0); + cfg.getClusterServerConfiguration().setNumDefaultCompactors(0); + + // Add servers in a resource group that will not get any work. These + // are the servers that should stop because they are idle. - cfg.getClusterServerConfiguration().addTabletServerResourceGroup("IDLE_PROCESS_TEST", 1); - cfg.getClusterServerConfiguration().addScanServerResourceGroup("IDLE_PROCESS_TEST", 1); - cfg.getClusterServerConfiguration().addCompactorResourceGroup("IDLE_PROCESS_TEST", 1); ++ cfg.getClusterServerConfiguration().addTabletServerResourceGroup(IDLE_RESOURCE_GROUP, 1); ++ cfg.getClusterServerConfiguration().addScanServerResourceGroup(IDLE_RESOURCE_GROUP, 1); ++ cfg.getClusterServerConfiguration().addCompactorResourceGroup(IDLE_RESOURCE_GROUP, 1); - cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL, "10s"); + cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL, + idleProcessInterval.toSeconds() + "s"); // Tell the server processes to use a StatsDMeterRegistry that will be configured // to push all metrics to the sink we started. @@@ -100,32 -93,38 +111,33 @@@ @Test public void testIdleStopMetrics() throws Exception { - // The server processes in the IDLE_PROCESS_TEST resource group - // should emit the idle metric after 10s of being idle based - // on the configuration for this test. Wait 20s before checking - // for it. - Thread.sleep(20_000); - getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); -- - List<String> statsDMetrics; + // should emit the idle metric after the configured duration of GENERAL_IDLE_PROCESS_INTERVAL + Thread.sleep(idleProcessInterval.toMillis()); AtomicBoolean sawCompactor = new AtomicBoolean(false); AtomicBoolean sawSServer = new AtomicBoolean(false); AtomicBoolean sawTServer = new AtomicBoolean(false); - // loop until we run out of lines or until we see all expected metrics - while (!(statsDMetrics = sink.getLines()).isEmpty() && !sawCompactor.get() && !sawSServer.get() - && !sawTServer.get()) { ++ + Wait.waitFor(() -> { + List<String> statsDMetrics = sink.getLines(); statsDMetrics.stream().filter(line -> line.startsWith(MetricsProducer.METRICS_SERVER_IDLE)) - .map(TestStatsDSink::parseStatsDMetric).forEach(a -> { - .peek(log::info).map(TestStatsDSink::parseStatsDMetric).forEach(a -> { ++ .peek(log::info).map(TestStatsDSink::parseStatsDMetric) ++ .filter(a -> a.getTags().get("resource.group").equals(IDLE_RESOURCE_GROUP)).forEach(a -> { String processName = a.getTags().get("process.name"); - if (processName.equals("tserver")) { + int value = Integer.parseInt(a.getValue()); + assertTrue(value == 0 || value == 1 || value == -1, "Unexpected value " + value); - if ("tserver".equals(processName) && value == 0) { - // Expect tserver to never be idle ++ // check that the idle metric was emitted for each ++ if ("tserver".equals(processName) && value == 1) { sawTServer.set(true); - } else if (processName.equals("sserver")) { + } else if ("sserver".equals(processName) && value == 1) { - // Expect scan server to be idle sawSServer.set(true); - } else if (processName.equals("compactor")) { + } else if ("compactor".equals(processName) && value == 1) { - // Expect compactor to be idle sawCompactor.set(true); } + }); - } + return sawCompactor.get() && sawSServer.get() && sawTServer.get(); + }); } }