This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 23d0ad752b29b6937eb0c98bdc101e0220fbc463
Merge: 510cfda7b6 d6eb1df8ee
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Wed May 29 15:08:29 2024 -0400

    Merge remote-tracking branch 'upstream/2.1'

 .../accumulo/core/metrics/MetricsProducer.java     |   2 +
 .../accumulo/server/compaction/FileCompactor.java  |  88 ++++++++++++--
 .../org/apache/accumulo/compactor/Compactor.java   |  27 ++++-
 .../tserver/metrics/TabletServerMetrics.java       |  19 ++++
 .../compaction/ExternalCompactionProgressIT.java   | 126 ++++++++++++++++++++-
 5 files changed, 242 insertions(+), 20 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 19161e80b0,228baa2165..ca3d90ccf1
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -566,9 -581,10 +566,11 @@@ public interface MetricsProducer 
  
    Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
  
 +  String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
    String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
    String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
+   String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
+   String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
  
    String METRICS_FATE_PREFIX = "accumulo.fate.";
    String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 5fb112da79,77ec9f1696..e2ea4c46b2
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@@ -31,8 -32,8 +32,9 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.concurrent.Callable;
 +import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.LongAdder;
  
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@@ -122,9 -122,19 +124,21 @@@ public class FileCompactor implements C
    private String currentLocalityGroup = "";
    private final long startTime;
  
-   private final AtomicLong entriesRead = new AtomicLong(0);
-   private final AtomicLong entriesWritten = new AtomicLong(0);
 +  private final AtomicInteger timesPaused = new AtomicInteger(0);
++
+   private final AtomicLong currentEntriesRead = new AtomicLong(0);
+   private final AtomicLong currentEntriesWritten = new AtomicLong(0);
+ 
+   // These track the cumulative count of entries (read and written) that has 
been recorded in
+   // the global counts. Their purpose is to avoid double counting of metrics 
during the update of
+   // global statistics.
+   private final AtomicLong lastRecordedEntriesRead = new AtomicLong(0);
+   private final AtomicLong lastRecordedEntriesWritten = new AtomicLong(0);
+ 
+   private static final LongAdder totalEntriesRead = new LongAdder();
+   private static final LongAdder totalEntriesWritten = new LongAdder();
+   private static volatile long lastUpdateTime = 0;
+ 
    private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS");
  
    // a unique id to identify a compactor
@@@ -144,12 -154,63 +158,64 @@@
      return currentLocalityGroup;
    }
  
-   private void clearStats() {
-     entriesRead.set(0);
-     entriesWritten.set(0);
+   private void clearCurrentEntryCounts() {
+     currentEntriesRead.set(0);
+     currentEntriesWritten.set(0);
 +    timesPaused.set(0);
    }
  
+   private void updateGlobalEntryCounts() {
+     updateTotalEntries(currentEntriesRead, lastRecordedEntriesRead, 
totalEntriesRead);
+     updateTotalEntries(currentEntriesWritten, lastRecordedEntriesWritten, 
totalEntriesWritten);
+   }
+ 
+   /**
+    * Updates the total count of entries by adding the difference between the 
current count and the
+    * last recorded count to the total.
+    *
+    * @param current The current count of entries
+    * @param recorded The last recorded count of entries
+    * @param total The total count to add the difference to
+    */
+   private void updateTotalEntries(AtomicLong current, AtomicLong recorded, 
LongAdder total) {
+     long currentCount = current.get();
+     long lastRecorded =
+         recorded.getAndUpdate(recordedValue -> Math.max(recordedValue, 
currentCount));
+     if (lastRecorded < currentCount) {
+       total.add(currentCount - lastRecorded);
+     }
+   }
+ 
+   /**
+    * @return the total entries written by compactions over the lifetime of 
this process.
+    */
+   public static long getTotalEntriesWritten() {
+     updateTotalEntries();
+     return totalEntriesWritten.sum();
+   }
+ 
+   /**
+    * @return the total entries read by compactions over the lifetime of this 
process.
+    */
+   public static long getTotalEntriesRead() {
+     updateTotalEntries();
+     return totalEntriesRead.sum();
+   }
+ 
+   /**
+    * Updates total entries read and written for all currently running 
compactions. Compactions will
+    * update the global stats when they finish. This can be called to update 
them sooner. This method
+    * is rate limited, so it will not cause issues if called too frequently.
+    */
+   private static void updateTotalEntries() {
+     long currentTime = System.nanoTime();
+     if (currentTime - lastUpdateTime < Duration.ofMillis(100).toNanos()) {
+       return;
+     }
+     runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts);
+     lastUpdateTime = currentTime;
+   }
+ 
    protected static final Set<FileCompactor> runningCompactions =
        Collections.synchronizedSet(new HashSet<>());
  
@@@ -510,13 -551,9 +578,13 @@@
    }
  
    long getEntriesWritten() {
-     return entriesWritten.get();
+     return currentEntriesWritten.get();
    }
  
 +  long getTimesPaused() {
 +    return timesPaused.get();
 +  }
 +
    long getStartTime() {
      return startTime;
    }
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index fbba00bab6,e5d0c186f3..b44fcbbca1
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -118,9 -116,10 +118,10 @@@ import org.apache.zookeeper.KeeperExcep
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import com.beust.jcommander.Parameter;
  import com.google.common.base.Preconditions;
 +import com.google.common.net.HostAndPort;
  
+ import io.micrometer.core.instrument.FunctionCounter;
  import io.micrometer.core.instrument.LongTaskTimer;
  import io.micrometer.core.instrument.MeterRegistry;
  
@@@ -147,14 -158,28 +148,29 @@@ public class Compactor extends Abstract
  
    private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
  
 -  protected Compactor(CompactorServerOpts opts, String[] args) {
 +  protected Compactor(ConfigOpts opts, String[] args) {
      super("compactor", opts, args);
 -    queueName = opts.getQueueName();
 +    queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME);
    }
  
+   private long getTotalEntriesRead() {
+     return FileCompactor.getTotalEntriesRead();
+   }
+ 
+   private long getTotalEntriesWritten() {
+     return FileCompactor.getTotalEntriesWritten();
+   }
+ 
    @Override
    public void registerMetrics(MeterRegistry registry) {
 +    super.registerMetrics(registry);
+     FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, 
Compactor::getTotalEntriesRead)
+         .description("Number of entries read by all compactions that have run 
on this compactor")
+         .register(registry);
+     FunctionCounter
+         .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, 
Compactor::getTotalEntriesWritten)
+         .description("Number of entries written by all compactions that have 
run on this compactor")
+         .register(registry);
      LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
          .description("Number and duration of stuck major 
compactions").register(registry);
      CompactionWatcher.setTimer(timer);
@@@ -682,14 -706,14 +698,15 @@@
                // Compaction has started. There should only be one in the list
                CompactionInfo info = running.get(0);
                if (info != null) {
+                 final long entriesRead = info.getEntriesRead();
+                 final long entriesWritten = info.getEntriesWritten();
                  if (inputEntries > 0) {
-                   percentComplete =
-                       Float.toString((info.getEntriesRead() / (float) 
inputEntries) * 100);
+                   percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
                  }
                  String message = String.format(
 -                    "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries",
 -                    entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
 +                    "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries, paused %d times",
-                     info.getEntriesRead(), inputEntries, percentComplete, "%",
-                     info.getEntriesWritten(), info.getTimesPaused());
++                    entriesRead, inputEntries, percentComplete, "%", 
entriesWritten,
++                    info.getTimesPaused());
                  watcher.run();
                  try {
                    LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 0d7936823b,0cbbc3a97a..f77f2ea6a9
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@@ -18,6 -18,7 +18,7 @@@
   */
  package org.apache.accumulo.test.compaction;
  
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
++import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
@@@ -192,14 -308,9 +308,14 @@@ public class ExternalCompactionProgress
    public Thread startChecker() {
      return Threads.createThread("RC checker", () -> {
        try {
-         while (!compactionFinished.get()) {
+         while (!stopCheckerThread.get()) {
            checkRunning();
 -          sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
 +          try {
 +            Thread.sleep(1000);
 +          } catch (InterruptedException ex) {
 +            log.debug("interrupted during sleep, forcing compaction finished 
as completed");
-             compactionFinished.set(true);
++            stopCheckerThread.set(true);
 +          }
          }
        } catch (TException e) {
          log.warn("{}", e.getMessage(), e);

Reply via email to