This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 23d0ad752b29b6937eb0c98bdc101e0220fbc463 Merge: 510cfda7b6 d6eb1df8ee Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Wed May 29 15:08:29 2024 -0400 Merge remote-tracking branch 'upstream/2.1' .../accumulo/core/metrics/MetricsProducer.java | 2 + .../accumulo/server/compaction/FileCompactor.java | 88 ++++++++++++-- .../org/apache/accumulo/compactor/Compactor.java | 27 ++++- .../tserver/metrics/TabletServerMetrics.java | 19 ++++ .../compaction/ExternalCompactionProgressIT.java | 126 ++++++++++++++++++++- 5 files changed, 242 insertions(+), 20 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 19161e80b0,228baa2165..ca3d90ccf1 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -566,9 -581,10 +566,11 @@@ public interface MetricsProducer Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); + String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; + String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 5fb112da79,77ec9f1696..e2ea4c46b2 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@@ -31,8 -32,8 +32,9 @@@ import java.util.Map import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@@ -122,9 -122,19 +124,21 @@@ public class FileCompactor implements C private String currentLocalityGroup = ""; private final long startTime; - private final AtomicLong entriesRead = new AtomicLong(0); - private final AtomicLong entriesWritten = new AtomicLong(0); + private final AtomicInteger timesPaused = new AtomicInteger(0); ++ + private final AtomicLong currentEntriesRead = new AtomicLong(0); + private final AtomicLong currentEntriesWritten = new AtomicLong(0); + + // These track the cumulative count of entries (read and written) that has been recorded in + // the global counts. Their purpose is to avoid double counting of metrics during the update of + // global statistics. + private final AtomicLong lastRecordedEntriesRead = new AtomicLong(0); + private final AtomicLong lastRecordedEntriesWritten = new AtomicLong(0); + + private static final LongAdder totalEntriesRead = new LongAdder(); + private static final LongAdder totalEntriesWritten = new LongAdder(); + private static volatile long lastUpdateTime = 0; + private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); // a unique id to identify a compactor @@@ -144,12 -154,63 +158,64 @@@ return currentLocalityGroup; } - private void clearStats() { - entriesRead.set(0); - entriesWritten.set(0); + private void clearCurrentEntryCounts() { + currentEntriesRead.set(0); + currentEntriesWritten.set(0); + timesPaused.set(0); } + private void updateGlobalEntryCounts() { + updateTotalEntries(currentEntriesRead, lastRecordedEntriesRead, totalEntriesRead); + updateTotalEntries(currentEntriesWritten, lastRecordedEntriesWritten, totalEntriesWritten); + } + + /** + * Updates the total count of entries by adding the difference between the current count and the + * last recorded count to the total. + * + * @param current The current count of entries + * @param recorded The last recorded count of entries + * @param total The total count to add the difference to + */ + private void updateTotalEntries(AtomicLong current, AtomicLong recorded, LongAdder total) { + long currentCount = current.get(); + long lastRecorded = + recorded.getAndUpdate(recordedValue -> Math.max(recordedValue, currentCount)); + if (lastRecorded < currentCount) { + total.add(currentCount - lastRecorded); + } + } + + /** + * @return the total entries written by compactions over the lifetime of this process. + */ + public static long getTotalEntriesWritten() { + updateTotalEntries(); + return totalEntriesWritten.sum(); + } + + /** + * @return the total entries read by compactions over the lifetime of this process. + */ + public static long getTotalEntriesRead() { + updateTotalEntries(); + return totalEntriesRead.sum(); + } + + /** + * Updates total entries read and written for all currently running compactions. Compactions will + * update the global stats when they finish. This can be called to update them sooner. This method + * is rate limited, so it will not cause issues if called too frequently. + */ + private static void updateTotalEntries() { + long currentTime = System.nanoTime(); + if (currentTime - lastUpdateTime < Duration.ofMillis(100).toNanos()) { + return; + } + runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts); + lastUpdateTime = currentTime; + } + protected static final Set<FileCompactor> runningCompactions = Collections.synchronizedSet(new HashSet<>()); @@@ -510,13 -551,9 +578,13 @@@ } long getEntriesWritten() { - return entriesWritten.get(); + return currentEntriesWritten.get(); } + long getTimesPaused() { + return timesPaused.get(); + } + long getStartTime() { return startTime; } diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index fbba00bab6,e5d0c186f3..b44fcbbca1 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -118,9 -116,10 +118,10 @@@ import org.apache.zookeeper.KeeperExcep import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; + import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; @@@ -147,14 -158,28 +148,29 @@@ public class Compactor extends Abstract private final AtomicBoolean compactionRunning = new AtomicBoolean(false); - protected Compactor(CompactorServerOpts opts, String[] args) { + protected Compactor(ConfigOpts opts, String[] args) { super("compactor", opts, args); - queueName = opts.getQueueName(); + queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME); } + private long getTotalEntriesRead() { + return FileCompactor.getTotalEntriesRead(); + } + + private long getTotalEntriesWritten() { + return FileCompactor.getTotalEntriesWritten(); + } + @Override public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); + FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this, Compactor::getTotalEntriesRead) + .description("Number of entries read by all compactions that have run on this compactor") + .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, Compactor::getTotalEntriesWritten) + .description("Number of entries written by all compactions that have run on this compactor") + .register(registry); LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); @@@ -682,14 -706,14 +698,15 @@@ // Compaction has started. There should only be one in the list CompactionInfo info = running.get(0); if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); if (inputEntries > 0) { - percentComplete = - Float.toString((info.getEntriesRead() / (float) inputEntries) * 100); + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); } String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries, paused %d times", - info.getEntriesRead(), inputEntries, percentComplete, "%", - info.getEntriesWritten(), info.getTimesPaused()); ++ entriesRead, inputEntries, percentComplete, "%", entriesWritten, ++ info.getTimesPaused()); watcher.run(); try { LOG.debug("Updating coordinator with compaction progress: {}.", message); diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 0d7936823b,0cbbc3a97a..f77f2ea6a9 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -18,6 -18,7 +18,7 @@@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; @@@ -192,14 -308,9 +308,14 @@@ public class ExternalCompactionProgress public Thread startChecker() { return Threads.createThread("RC checker", () -> { try { - while (!compactionFinished.get()) { + while (!stopCheckerThread.get()) { checkRunning(); - sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + log.debug("interrupted during sleep, forcing compaction finished as completed"); - compactionFinished.set(true); ++ stopCheckerThread.set(true); + } } } catch (TException e) { log.warn("{}", e.getMessage(), e);