This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new d6eb1df8ee Add metrics for entries read and written during compactions (#4572) d6eb1df8ee is described below commit d6eb1df8ee3155dfbf1fe03548433d6a61566f8f Author: Dom G <domgargu...@apache.org> AuthorDate: Wed May 29 14:35:03 2024 -0400 Add metrics for entries read and written during compactions (#4572) * Add metrics for compaction entries read and written on compactors and tservers * Add IT to make sure these metrics behave as intended. --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/core/metrics/MetricsProducer.java | 2 + .../accumulo/server/compaction/FileCompactor.java | 87 +++++++++++++-- .../org/apache/accumulo/compactor/Compactor.java | 26 ++++- .../tserver/metrics/TabletServerMetrics.java | 19 ++++ .../compaction/ExternalCompactionProgressIT.java | 122 ++++++++++++++++++++- 5 files changed, 237 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index dd4489b87c..228baa2165 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -583,6 +583,8 @@ public interface MetricsProducer { String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; + String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index df59be89a3..77ec9f1696 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -21,6 +21,7 @@ package org.apache.accumulo.server.compaction; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,6 +33,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -120,8 +122,19 @@ public class FileCompactor implements Callable<CompactionStats> { private String currentLocalityGroup = ""; private final long startTime; - private final AtomicLong entriesRead = new AtomicLong(0); - private final AtomicLong entriesWritten = new AtomicLong(0); + private final AtomicLong currentEntriesRead = new AtomicLong(0); + private final AtomicLong currentEntriesWritten = new AtomicLong(0); + + // These track the cumulative count of entries (read and written) that has been recorded in + // the global counts. Their purpose is to avoid double counting of metrics during the update of + // global statistics. + private final AtomicLong lastRecordedEntriesRead = new AtomicLong(0); + private final AtomicLong lastRecordedEntriesWritten = new AtomicLong(0); + + private static final LongAdder totalEntriesRead = new LongAdder(); + private static final LongAdder totalEntriesWritten = new LongAdder(); + private static volatile long lastUpdateTime = 0; + private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); // a unique id to identify a compactor @@ -141,9 +154,61 @@ public class FileCompactor implements Callable<CompactionStats> { return currentLocalityGroup; } - private void clearStats() { - entriesRead.set(0); - entriesWritten.set(0); + private void clearCurrentEntryCounts() { + currentEntriesRead.set(0); + currentEntriesWritten.set(0); + } + + private void updateGlobalEntryCounts() { + updateTotalEntries(currentEntriesRead, lastRecordedEntriesRead, totalEntriesRead); + updateTotalEntries(currentEntriesWritten, lastRecordedEntriesWritten, totalEntriesWritten); + } + + /** + * Updates the total count of entries by adding the difference between the current count and the + * last recorded count to the total. + * + * @param current The current count of entries + * @param recorded The last recorded count of entries + * @param total The total count to add the difference to + */ + private void updateTotalEntries(AtomicLong current, AtomicLong recorded, LongAdder total) { + long currentCount = current.get(); + long lastRecorded = + recorded.getAndUpdate(recordedValue -> Math.max(recordedValue, currentCount)); + if (lastRecorded < currentCount) { + total.add(currentCount - lastRecorded); + } + } + + /** + * @return the total entries written by compactions over the lifetime of this process. + */ + public static long getTotalEntriesWritten() { + updateTotalEntries(); + return totalEntriesWritten.sum(); + } + + /** + * @return the total entries read by compactions over the lifetime of this process. + */ + public static long getTotalEntriesRead() { + updateTotalEntries(); + return totalEntriesRead.sum(); + } + + /** + * Updates total entries read and written for all currently running compactions. Compactions will + * update the global stats when they finish. This can be called to update them sooner. This method + * is rate limited, so it will not cause issues if called too frequently. + */ + private static void updateTotalEntries() { + long currentTime = System.nanoTime(); + if (currentTime - lastUpdateTime < Duration.ofMillis(100).toNanos()) { + return; + } + runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts); + lastUpdateTime = currentTime; } protected static final Set<FileCompactor> runningCompactions = @@ -211,7 +276,7 @@ public class FileCompactor implements Callable<CompactionStats> { String threadStartDate = dateFormatter.format(new Date()); - clearStats(); + clearCurrentEntryCounts(); String oldThreadName = Thread.currentThread().getName(); String newThreadName = @@ -298,6 +363,8 @@ public class FileCompactor implements Callable<CompactionStats> { runningCompactions.remove(this); } + updateGlobalEntryCounts(); + try { if (mfw != null) { // compaction must not have finished successfully, so close its output file @@ -397,7 +464,7 @@ public class FileCompactor implements Callable<CompactionStats> { } CountingIterator citr = - new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead); + new CountingIterator(new MultiIterator(iters, extent.toDataRange()), currentEntriesRead); SortedKeyValueIterator<Key,Value> delIter = DeletingIterator.wrap(citr, propagateDeletes, DeletingIterator.getBehavior(acuTableConf)); ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); @@ -425,7 +492,7 @@ public class FileCompactor implements Callable<CompactionStats> { if (entriesCompacted % 1024 == 0) { // Periodically update stats, do not want to do this too often since its volatile - entriesWritten.addAndGet(1024); + currentEntriesWritten.addAndGet(1024); } } @@ -480,11 +547,11 @@ public class FileCompactor implements Callable<CompactionStats> { } long getEntriesRead() { - return entriesRead.get(); + return currentEntriesRead.get(); } long getEntriesWritten() { - return entriesWritten.get(); + return currentEntriesWritten.get(); } long getStartTime() { diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 2b85401516..e5d0c186f3 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -119,6 +119,7 @@ import org.slf4j.LoggerFactory; import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@ -162,8 +163,23 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac queueName = opts.getQueueName(); } + private long getTotalEntriesRead() { + return FileCompactor.getTotalEntriesRead(); + } + + private long getTotalEntriesWritten() { + return FileCompactor.getTotalEntriesWritten(); + } + @Override public void registerMetrics(MeterRegistry registry) { + FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, Compactor::getTotalEntriesRead) + .description("Number of entries read by all compactions that have run on this compactor") + .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, Compactor::getTotalEntriesWritten) + .description("Number of entries written by all compactions that have run on this compactor") + .register(registry); LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); @@ -690,20 +706,20 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac // Compaction has started. There should only be one in the list CompactionInfo info = running.get(0); if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); if (inputEntries > 0) { - percentComplete = - Float.toString((info.getEntriesRead() / (float) inputEntries) * 100); + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); } String message = String.format( "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - info.getEntriesRead(), inputEntries, percentComplete, "%", - info.getEntriesWritten()); + entriesRead, inputEntries, percentComplete, "%", entriesWritten); watcher.run(); try { LOG.debug("Updating coordinator with compaction progress: {}.", message); TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message, - inputEntries, info.getEntriesRead(), info.getEntriesWritten()); + inputEntries, entriesRead, entriesWritten); updateCompactionState(job, update); } catch (RetriesExceededException e) { LOG.warn("Error updating coordinator with compaction progress, error: {}", diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 0695181c87..70b0c4980b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -20,8 +20,10 @@ package org.apache.accumulo.tserver.metrics; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.compaction.CompactionWatcher; +import org.apache.accumulo.server.compaction.FileCompactor; import org.apache.accumulo.tserver.TabletServer; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@ -34,8 +36,25 @@ public class TabletServerMetrics implements MetricsProducer { util = new TabletServerMetricsUtil(tserver); } + private long getTotalEntriesRead() { + return FileCompactor.getTotalEntriesRead(); + } + + private long getTotalEntriesWritten() { + return FileCompactor.getTotalEntriesWritten(); + } + @Override public void registerMetrics(MeterRegistry registry) { + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_READ, this, TabletServerMetrics::getTotalEntriesRead) + .description("Number of entries read by all compactions that have run on this tserver") + .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, + TabletServerMetrics::getTotalEntriesWritten) + .description("Number of entries written by all compactions that have run on this tserver") + .register(registry); LongTaskTimer timer = LongTaskTimer.builder(METRICS_TSERVER_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index a9afc4fd43..0cbbc3a97a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.coordinator.CompactionCoordinator; @@ -49,14 +50,21 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,11 +87,117 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { Map<String,RunningCompactionInfo> runningMap = new HashMap<>(); List<EC_PROGRESS> progressList = new ArrayList<>(); - private final AtomicBoolean compactionFinished = new AtomicBoolean(false); + private static final AtomicBoolean stopCheckerThread = new AtomicBoolean(false); + private static TestStatsDSink sink; + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + if (sink != null) { + sink.close(); + } + } + + @BeforeEach + public void setup() { + stopCheckerThread.set(false); + } @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName()); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + + @Test + public void testProgressViaMetrics() throws Exception { + String table = this.getUniqueNames(1)[0]; + + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + createTable(client, table, "cs1"); + writeData(client, table, ROWS); + + cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); + cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); + + final AtomicLong expectedEntriesRead = new AtomicLong(9216); + final AtomicLong expectedEntriesWritten = new AtomicLong(4096); + final AtomicLong totalEntriesRead = new AtomicLong(0); + final AtomicLong totalEntriesWritten = new AtomicLong(0); + + Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); + checkerThread.start(); + + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 1); + client.tableOperations().attachIterator(table, setting, + EnumSet.of(IteratorUtil.IteratorScope.majc)); + log.info("Compacting table"); + compact(client, table, 2, QUEUE1, true); + log.info("Done Compacting table"); + verify(client, table, 2, ROWS); + + Wait.waitFor(() -> { + if (totalEntriesRead.get() == expectedEntriesRead.get() + && totalEntriesWritten.get() == expectedEntriesWritten.get()) { + return true; + } + log.info( + "Waiting for entries read to be {} (currently {}) and entries written to be {} (currently {})", + expectedEntriesRead.get(), totalEntriesRead.get(), expectedEntriesWritten.get(), + totalEntriesWritten.get()); + return false; + }, 30000, 3000, "Entries read and written metrics values did not match expected values"); + + stopCheckerThread.set(true); + checkerThread.join(); + } + } + + /** + * Get a thread that checks the metrics for entries read and written. + * + * @param totalEntriesRead this is set to the value of the entries read metric + * @param totalEntriesWritten this is set to the value of the entries written metric + */ + private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, + AtomicLong totalEntriesWritten) { + return Threads.createThread("metric-tailer", () -> { + log.info("Starting metric tailer"); + + sink.getLines().clear(); + + while (!stopCheckerThread.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (stopCheckerThread.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) { + TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); + int value = Integer.parseInt(e.getValue()); + totalEntriesRead.addAndGet(value); + log.info("Found entries.read metric: {} with value: {}", e.getName(), value); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) { + TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s); + int value = Integer.parseInt(e.getValue()); + totalEntriesWritten.addAndGet(value); + log.info("Found entries.written metric: {} with value: {}", e.getName(), value); + } + } + sleepUninterruptibly(3000, TimeUnit.MILLISECONDS); + } + log.info("Metric tailer thread finished"); + }); } @Test @@ -109,7 +223,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { verify(client, table1, 2, ROWS); log.info("Done Compacting table"); - compactionFinished.set(true); + stopCheckerThread.set(true); checkerThread.join(); verifyProgress(); @@ -164,7 +278,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { client.tableOperations().compact(tableName2, new CompactionConfig().setWait(true).setIterators(List.of(setting))); log.info("Finished compacting table " + tableName2); - compactionFinished.set(true); + stopCheckerThread.set(true); log.info("Waiting on progress checker thread"); checkerThread.join(); @@ -194,7 +308,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { public Thread startChecker() { return Threads.createThread("RC checker", () -> { try { - while (!compactionFinished.get()) { + while (!stopCheckerThread.get()) { checkRunning(); sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); }