This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d6eb1df8ee Add metrics for entries read and written during compactions 
(#4572)
d6eb1df8ee is described below

commit d6eb1df8ee3155dfbf1fe03548433d6a61566f8f
Author: Dom G <domgargu...@apache.org>
AuthorDate: Wed May 29 14:35:03 2024 -0400

    Add metrics for entries read and written during compactions (#4572)
    
    * Add metrics for compaction entries read and written on compactors and 
tservers
    * Add IT to make sure these metrics behave as intended.
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../accumulo/core/metrics/MetricsProducer.java     |   2 +
 .../accumulo/server/compaction/FileCompactor.java  |  87 +++++++++++++--
 .../org/apache/accumulo/compactor/Compactor.java   |  26 ++++-
 .../tserver/metrics/TabletServerMetrics.java       |  19 ++++
 .../compaction/ExternalCompactionProgressIT.java   | 122 ++++++++++++++++++++-
 5 files changed, 237 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index dd4489b87c..228baa2165 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -583,6 +583,8 @@ public interface MetricsProducer {
 
   String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
+  String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
+  String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
 
   String METRICS_FATE_PREFIX = "accumulo.fate.";
   String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index df59be89a3..77ec9f1696 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.server.compaction;
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -120,8 +122,19 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   private String currentLocalityGroup = "";
   private final long startTime;
 
-  private final AtomicLong entriesRead = new AtomicLong(0);
-  private final AtomicLong entriesWritten = new AtomicLong(0);
+  private final AtomicLong currentEntriesRead = new AtomicLong(0);
+  private final AtomicLong currentEntriesWritten = new AtomicLong(0);
+
+  // These track the cumulative count of entries (read and written) that has 
been recorded in
+  // the global counts. Their purpose is to avoid double counting of metrics 
during the update of
+  // global statistics.
+  private final AtomicLong lastRecordedEntriesRead = new AtomicLong(0);
+  private final AtomicLong lastRecordedEntriesWritten = new AtomicLong(0);
+
+  private static final LongAdder totalEntriesRead = new LongAdder();
+  private static final LongAdder totalEntriesWritten = new LongAdder();
+  private static volatile long lastUpdateTime = 0;
+
   private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS");
 
   // a unique id to identify a compactor
@@ -141,9 +154,61 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     return currentLocalityGroup;
   }
 
-  private void clearStats() {
-    entriesRead.set(0);
-    entriesWritten.set(0);
+  private void clearCurrentEntryCounts() {
+    currentEntriesRead.set(0);
+    currentEntriesWritten.set(0);
+  }
+
+  private void updateGlobalEntryCounts() {
+    updateTotalEntries(currentEntriesRead, lastRecordedEntriesRead, 
totalEntriesRead);
+    updateTotalEntries(currentEntriesWritten, lastRecordedEntriesWritten, 
totalEntriesWritten);
+  }
+
+  /**
+   * Updates the total count of entries by adding the difference between the 
current count and the
+   * last recorded count to the total.
+   *
+   * @param current The current count of entries
+   * @param recorded The last recorded count of entries
+   * @param total The total count to add the difference to
+   */
+  private void updateTotalEntries(AtomicLong current, AtomicLong recorded, 
LongAdder total) {
+    long currentCount = current.get();
+    long lastRecorded =
+        recorded.getAndUpdate(recordedValue -> Math.max(recordedValue, 
currentCount));
+    if (lastRecorded < currentCount) {
+      total.add(currentCount - lastRecorded);
+    }
+  }
+
+  /**
+   * @return the total entries written by compactions over the lifetime of 
this process.
+   */
+  public static long getTotalEntriesWritten() {
+    updateTotalEntries();
+    return totalEntriesWritten.sum();
+  }
+
+  /**
+   * @return the total entries read by compactions over the lifetime of this 
process.
+   */
+  public static long getTotalEntriesRead() {
+    updateTotalEntries();
+    return totalEntriesRead.sum();
+  }
+
+  /**
+   * Updates total entries read and written for all currently running 
compactions. Compactions will
+   * update the global stats when they finish. This can be called to update 
them sooner. This method
+   * is rate limited, so it will not cause issues if called too frequently.
+   */
+  private static void updateTotalEntries() {
+    long currentTime = System.nanoTime();
+    if (currentTime - lastUpdateTime < Duration.ofMillis(100).toNanos()) {
+      return;
+    }
+    runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts);
+    lastUpdateTime = currentTime;
   }
 
   protected static final Set<FileCompactor> runningCompactions =
@@ -211,7 +276,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
     String threadStartDate = dateFormatter.format(new Date());
 
-    clearStats();
+    clearCurrentEntryCounts();
 
     String oldThreadName = Thread.currentThread().getName();
     String newThreadName =
@@ -298,6 +363,8 @@ public class FileCompactor implements 
Callable<CompactionStats> {
         runningCompactions.remove(this);
       }
 
+      updateGlobalEntryCounts();
+
       try {
         if (mfw != null) {
           // compaction must not have finished successfully, so close its 
output file
@@ -397,7 +464,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
       }
 
       CountingIterator citr =
-          new CountingIterator(new MultiIterator(iters, extent.toDataRange()), 
entriesRead);
+          new CountingIterator(new MultiIterator(iters, extent.toDataRange()), 
currentEntriesRead);
       SortedKeyValueIterator<Key,Value> delIter =
           DeletingIterator.wrap(citr, propagateDeletes, 
DeletingIterator.getBehavior(acuTableConf));
       ColumnFamilySkippingIterator cfsi = new 
ColumnFamilySkippingIterator(delIter);
@@ -425,7 +492,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
           if (entriesCompacted % 1024 == 0) {
             // Periodically update stats, do not want to do this too often 
since its volatile
-            entriesWritten.addAndGet(1024);
+            currentEntriesWritten.addAndGet(1024);
           }
         }
 
@@ -480,11 +547,11 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   }
 
   long getEntriesRead() {
-    return entriesRead.get();
+    return currentEntriesRead.get();
   }
 
   long getEntriesWritten() {
-    return entriesWritten.get();
+    return currentEntriesWritten.get();
   }
 
   long getStartTime() {
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2b85401516..e5d0c186f3 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -119,6 +119,7 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.Parameter;
 import com.google.common.base.Preconditions;
 
+import io.micrometer.core.instrument.FunctionCounter;
 import io.micrometer.core.instrument.LongTaskTimer;
 import io.micrometer.core.instrument.MeterRegistry;
 
@@ -162,8 +163,23 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     queueName = opts.getQueueName();
   }
 
+  private long getTotalEntriesRead() {
+    return FileCompactor.getTotalEntriesRead();
+  }
+
+  private long getTotalEntriesWritten() {
+    return FileCompactor.getTotalEntriesWritten();
+  }
+
   @Override
   public void registerMetrics(MeterRegistry registry) {
+    FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, 
Compactor::getTotalEntriesRead)
+        .description("Number of entries read by all compactions that have run 
on this compactor")
+        .register(registry);
+    FunctionCounter
+        .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, 
Compactor::getTotalEntriesWritten)
+        .description("Number of entries written by all compactions that have 
run on this compactor")
+        .register(registry);
     LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
         .description("Number and duration of stuck major 
compactions").register(registry);
     CompactionWatcher.setTimer(timer);
@@ -690,20 +706,20 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
               // Compaction has started. There should only be one in the list
               CompactionInfo info = running.get(0);
               if (info != null) {
+                final long entriesRead = info.getEntriesRead();
+                final long entriesWritten = info.getEntriesWritten();
                 if (inputEntries > 0) {
-                  percentComplete =
-                      Float.toString((info.getEntriesRead() / (float) 
inputEntries) * 100);
+                  percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
                 }
                 String message = String.format(
                     "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries",
-                    info.getEntriesRead(), inputEntries, percentComplete, "%",
-                    info.getEntriesWritten());
+                    entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
                 watcher.run();
                 try {
                   LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
                   TCompactionStatusUpdate update =
                       new 
TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message,
-                          inputEntries, info.getEntriesRead(), 
info.getEntriesWritten());
+                          inputEntries, entriesRead, entriesWritten);
                   updateCompactionState(job, update);
                 } catch (RetriesExceededException e) {
                   LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
index 0695181c87..70b0c4980b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
@@ -20,8 +20,10 @@ package org.apache.accumulo.tserver.metrics;
 
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
+import org.apache.accumulo.server.compaction.FileCompactor;
 import org.apache.accumulo.tserver.TabletServer;
 
+import io.micrometer.core.instrument.FunctionCounter;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.LongTaskTimer;
 import io.micrometer.core.instrument.MeterRegistry;
@@ -34,8 +36,25 @@ public class TabletServerMetrics implements MetricsProducer {
     util = new TabletServerMetricsUtil(tserver);
   }
 
+  private long getTotalEntriesRead() {
+    return FileCompactor.getTotalEntriesRead();
+  }
+
+  private long getTotalEntriesWritten() {
+    return FileCompactor.getTotalEntriesWritten();
+  }
+
   @Override
   public void registerMetrics(MeterRegistry registry) {
+    FunctionCounter
+        .builder(METRICS_COMPACTOR_ENTRIES_READ, this, 
TabletServerMetrics::getTotalEntriesRead)
+        .description("Number of entries read by all compactions that have run 
on this tserver")
+        .register(registry);
+    FunctionCounter
+        .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this,
+            TabletServerMetrics::getTotalEntriesWritten)
+        .description("Number of entries written by all compactions that have 
run on this tserver")
+        .register(registry);
     LongTaskTimer timer = LongTaskTimer.builder(METRICS_TSERVER_MAJC_STUCK)
         .description("Number and duration of stuck major 
compactions").register(registry);
     CompactionWatcher.setTimer(timer);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index a9afc4fd43..0cbbc3a97a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.coordinator.CompactionCoordinator;
@@ -49,14 +50,21 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,11 +87,117 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
   Map<String,RunningCompactionInfo> runningMap = new HashMap<>();
   List<EC_PROGRESS> progressList = new ArrayList<>();
 
-  private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
+  private static final AtomicBoolean stopCheckerThread = new 
AtomicBoolean(false);
+  private static TestStatsDSink sink;
+
+  @BeforeAll
+  public static void before() throws Exception {
+    sink = new TestStatsDSink();
+  }
+
+  @AfterAll
+  public static void after() throws Exception {
+    if (sink != null) {
+      sink.close();
+    }
+  }
+
+  @BeforeEach
+  public void setup() {
+    stopCheckerThread.set(false);
+  }
 
   @Override
   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
     ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+    cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+    cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, 
TestStatsDRegistryFactory.class.getName());
+    Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+        TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+    cfg.setSystemProperties(sysProps);
+  }
+
+  @Test
+  public void testProgressViaMetrics() throws Exception {
+    String table = this.getUniqueNames(1)[0];
+
+    try (AccumuloClient client =
+        Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
+      createTable(client, table, "cs1");
+      writeData(client, table, ROWS);
+
+      cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
+      
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+
+      final AtomicLong expectedEntriesRead = new AtomicLong(9216);
+      final AtomicLong expectedEntriesWritten = new AtomicLong(4096);
+      final AtomicLong totalEntriesRead = new AtomicLong(0);
+      final AtomicLong totalEntriesWritten = new AtomicLong(0);
+
+      Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
+      checkerThread.start();
+
+      IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+      SlowIterator.setSleepTime(setting, 1);
+      client.tableOperations().attachIterator(table, setting,
+          EnumSet.of(IteratorUtil.IteratorScope.majc));
+      log.info("Compacting table");
+      compact(client, table, 2, QUEUE1, true);
+      log.info("Done Compacting table");
+      verify(client, table, 2, ROWS);
+
+      Wait.waitFor(() -> {
+        if (totalEntriesRead.get() == expectedEntriesRead.get()
+            && totalEntriesWritten.get() == expectedEntriesWritten.get()) {
+          return true;
+        }
+        log.info(
+            "Waiting for entries read to be {} (currently {}) and entries 
written to be {} (currently {})",
+            expectedEntriesRead.get(), totalEntriesRead.get(), 
expectedEntriesWritten.get(),
+            totalEntriesWritten.get());
+        return false;
+      }, 30000, 3000, "Entries read and written metrics values did not match 
expected values");
+
+      stopCheckerThread.set(true);
+      checkerThread.join();
+    }
+  }
+
+  /**
+   * Get a thread that checks the metrics for entries read and written.
+   *
+   * @param totalEntriesRead this is set to the value of the entries read 
metric
+   * @param totalEntriesWritten this is set to the value of the entries 
written metric
+   */
+  private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
+      AtomicLong totalEntriesWritten) {
+    return Threads.createThread("metric-tailer", () -> {
+      log.info("Starting metric tailer");
+
+      sink.getLines().clear();
+
+      while (!stopCheckerThread.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (stopCheckerThread.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) {
+            TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+            int value = Integer.parseInt(e.getValue());
+            totalEntriesRead.addAndGet(value);
+            log.info("Found entries.read metric: {} with value: {}", 
e.getName(), value);
+          } else if 
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) {
+            TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+            int value = Integer.parseInt(e.getValue());
+            totalEntriesWritten.addAndGet(value);
+            log.info("Found entries.written metric: {} with value: {}", 
e.getName(), value);
+          }
+        }
+        sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
+      }
+      log.info("Metric tailer thread finished");
+    });
   }
 
   @Test
@@ -109,7 +223,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       verify(client, table1, 2, ROWS);
 
       log.info("Done Compacting table");
-      compactionFinished.set(true);
+      stopCheckerThread.set(true);
       checkerThread.join();
 
       verifyProgress();
@@ -164,7 +278,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
       client.tableOperations().compact(tableName2,
           new CompactionConfig().setWait(true).setIterators(List.of(setting)));
       log.info("Finished compacting table " + tableName2);
-      compactionFinished.set(true);
+      stopCheckerThread.set(true);
 
       log.info("Waiting on progress checker thread");
       checkerThread.join();
@@ -194,7 +308,7 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
   public Thread startChecker() {
     return Threads.createThread("RC checker", () -> {
       try {
-        while (!compactionFinished.get()) {
+        while (!stopCheckerThread.get()) {
           checkRunning();
           sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
         }

Reply via email to