This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 32bbcbf351da18f590ce99dda64f305e904f1d5e Merge: 3fcab5e18a 627ed85555 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Thu May 30 11:45:20 2024 -0400 Merge remote-tracking branch 'upstream/main' into elasticity .../accumulo/core/metrics/MetricsProducer.java | 2 + .../accumulo/server/compaction/FileCompactor.java | 88 +++++++++++++-- .../org/apache/accumulo/compactor/Compactor.java | 27 ++++- .../tserver/metrics/TabletServerMetrics.java | 24 ++++ .../compaction/ExternalCompactionProgressIT.java | 123 ++++++++++++++++++++- 5 files changed, 244 insertions(+), 20 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 0ed24a24b4,ca3d90ccf1..8e0c0a8d58 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -624,21 -567,10 +624,23 @@@ public interface MetricsProducer Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; + String METRICS_SERVER_IDLE = "accumulo.server.idle"; + String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue."; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_QUEUE_PREFIX + "count"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_QUEUE_PREFIX + "length"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority"; + String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; + String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 41ff9f5f51,cebdce684d..4d47b63053 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -63,6 -65,8 +65,7 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; + import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 2922a3999e,b44fcbbca1..4c5357e35a --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -149,13 -150,17 +150,21 @@@ public class Compactor extends Abstract protected Compactor(ConfigOpts opts, String[] args) { super("compactor", opts, args); - queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME); + } + + @Override + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return conf.get(Property.COMPACTOR_GROUP_NAME); } + private long getTotalEntriesRead() { + return FileCompactor.getTotalEntriesRead(); + } + + private long getTotalEntriesWritten() { + return FileCompactor.getTotalEntriesWritten(); + } + @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 30c8594f6c,70b0c4980b..847c39f572 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@@ -32,9 -36,28 +36,29 @@@ public class TabletServerMetrics implem util = new TabletServerMetricsUtil(tserver); } + private long getTotalEntriesRead() { + return FileCompactor.getTotalEntriesRead(); + } + + private long getTotalEntriesWritten() { + return FileCompactor.getTotalEntriesWritten(); + } + @Override public void registerMetrics(MeterRegistry registry) { + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_READ, this, TabletServerMetrics::getTotalEntriesRead) + .description("Number of entries read by all compactions that have run on this tserver") + .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, + TabletServerMetrics::getTotalEntriesWritten) + .description("Number of entries written by all compactions that have run on this tserver") + .register(registry); - LongTaskTimer timer = LongTaskTimer.builder(METRICS_TSERVER_MAJC_STUCK) ++ LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) + .description("Number and duration of stuck major compactions").register(registry); + CompactionWatcher.setTimer(timer); + Gauge .builder(METRICS_TSERVER_TABLETS_LONG_ASSIGNMENTS, util, TabletServerMetricsUtil::getLongTabletAssignments) diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 1bcd075577,f77f2ea6a9..b9b41e0833 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -18,7 -18,8 +18,8 @@@ */ package org.apache.accumulo.test.compaction; + import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions; @@@ -33,9 -34,12 +34,11 @@@ import java.util.EnumSet import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; @@@ -46,16 -50,21 +49,23 @@@ import org.apache.accumulo.core.data.Ta import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; + import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.functional.SlowIterator; + import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; + import org.apache.accumulo.test.metrics.TestStatsDSink; + import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -85,7 -110,94 +113,92 @@@ public class ExternalCompactionProgress @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + cfg.getClusterServerConfiguration().addCompactorResourceGroup(GROUP1, 1); + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName()); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + + @Test + public void testProgressViaMetrics() throws Exception { + String table = this.getUniqueNames(1)[0]; + + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + createTable(client, table, "cs1"); + writeData(client, table, ROWS); + - cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); - - final AtomicLong expectedEntriesRead = new AtomicLong(9216); - final AtomicLong expectedEntriesWritten = new AtomicLong(4096); ++ final long expectedEntriesRead = 18432; ++ final long expectedEntriesWritten = 13312; + final AtomicLong totalEntriesRead = new AtomicLong(0); + final AtomicLong totalEntriesWritten = new AtomicLong(0); + + Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); + checkerThread.start(); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 1); + client.tableOperations().attachIterator(table, setting, + EnumSet.of(IteratorUtil.IteratorScope.majc)); + log.info("Compacting table"); - compact(client, table, 2, QUEUE1, true); ++ compact(client, table, 2, GROUP1, true); + log.info("Done Compacting table"); + verify(client, table, 2, ROWS); + + Wait.waitFor(() -> { - if (totalEntriesRead.get() == expectedEntriesRead.get() - && totalEntriesWritten.get() == expectedEntriesWritten.get()) { ++ if (totalEntriesRead.get() == expectedEntriesRead ++ && totalEntriesWritten.get() == expectedEntriesWritten) { + return true; + } + log.info( + "Waiting for entries read to be {} (currently {}) and entries written to be {} (currently {})", - expectedEntriesRead.get(), totalEntriesRead.get(), expectedEntriesWritten.get(), ++ expectedEntriesRead, totalEntriesRead.get(), expectedEntriesWritten, + totalEntriesWritten.get()); + return false; + }, 30000, 3000, "Entries read and written metrics values did not match expected values"); + + stopCheckerThread.set(true); + checkerThread.join(); + } + } + + /** + * Get a thread that checks the metrics for entries read and written. + * + * @param totalEntriesRead this is set to the value of the entries read metric + * @param totalEntriesWritten this is set to the value of the entries written metric + */ + private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, + AtomicLong totalEntriesWritten) { + return Threads.createThread("metric-tailer", () -> { + log.info("Starting metric tailer"); + + sink.getLines().clear(); + + while (!stopCheckerThread.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (stopCheckerThread.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) { + TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); + int value = Integer.parseInt(e.getValue()); + totalEntriesRead.addAndGet(value); + log.info("Found entries.read metric: {} with value: {}", e.getName(), value); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) { + TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); + int value = Integer.parseInt(e.getValue()); + totalEntriesWritten.addAndGet(value); + log.info("Found entries.written metric: {} with value: {}", e.getName(), value); + } + } + sleepUninterruptibly(3000, TimeUnit.MILLISECONDS); + } + log.info("Metric tailer thread finished"); + }); } @Test