This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 1261ec2472 Convert micrometer Counters to FunctionCounters (#4555) 1261ec2472 is described below commit 1261ec24726f65fca3b7314d00c17c062a8dfd6c Author: Dom G <domgargu...@apache.org> AuthorDate: Fri May 17 14:08:21 2024 -0400 Convert micrometer Counters to FunctionCounters (#4555) --- .../server/compaction/PausedCompactionMetrics.java | 20 +++--- .../apache/accumulo/tserver/ScanServerMetrics.java | 10 +-- .../accumulo/tserver/ThriftScanClientHandler.java | 16 ++--- .../tserver/metrics/TabletServerScanMetrics.java | 84 +++++++++------------- .../tserver/metrics/TabletServerUpdateMetrics.java | 27 ++++--- .../apache/accumulo/tserver/tablet/TabletBase.java | 2 +- 6 files changed, 75 insertions(+), 84 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java index 3d1a07be42..29e6da06d0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java @@ -18,30 +18,32 @@ */ package org.apache.accumulo.server.compaction; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.accumulo.core.metrics.MetricsProducer; -import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.MeterRegistry; public class PausedCompactionMetrics implements MetricsProducer { - private Counter majcPauses; - private Counter mincPauses; + private final AtomicLong majcPauseCount = new AtomicLong(0); + private final AtomicLong mincPauseCount = new AtomicLong(0); public void incrementMinCPause() { - mincPauses.increment(); + mincPauseCount.incrementAndGet(); } public void incrementMajCPause() { - majcPauses.increment(); + majcPauseCount.incrementAndGet(); } @Override public void registerMetrics(MeterRegistry registry) { - majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major compaction pause count") - .register(registry); - mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor compactor pause count") - .register(registry); + FunctionCounter.builder(METRICS_MAJC_PAUSED, majcPauseCount, AtomicLong::get) + .description("major compaction pause count").register(registry); + FunctionCounter.builder(METRICS_MINC_PAUSED, mincPauseCount, AtomicLong::get) + .description("minor compactor pause count").register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java index 1a516b597b..37ddfab028 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java @@ -18,13 +18,15 @@ */ package org.apache.accumulo.tserver; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metrics.MetricsProducer; import com.github.benmanes.caffeine.cache.LoadingCache; -import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; @@ -32,7 +34,7 @@ import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; public class ScanServerMetrics implements MetricsProducer { private Timer reservationTimer; - private Counter busyTimeoutCount; + private final AtomicLong busyTimeoutCount = new AtomicLong(0); private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache; @@ -44,7 +46,7 @@ public class ScanServerMetrics implements MetricsProducer { public void registerMetrics(MeterRegistry registry) { reservationTimer = Timer.builder(MetricsProducer.METRICS_SCAN_RESERVATION_TIMER) .description("Time to reserve a tablets files for scan").register(registry); - busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER) + FunctionCounter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER, busyTimeoutCount, AtomicLong::get) .description("The number of scans where a busy timeout happened").register(registry); CaffeineCacheMetrics.monitor(registry, tabletMetadataCache, METRICS_SCAN_TABLET_METADATA_CACHE); } @@ -54,6 +56,6 @@ public class ScanServerMetrics implements MetricsProducer { } public void incrementBusy() { - busyTimeoutCount.increment(); + busyTimeoutCount.incrementAndGet(); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index a89af6617d..71ea20e62d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -159,7 +159,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { ThriftSecurityException, org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementStartScan(1.0D); + server.getScanMetrics().incrementStartScan(); TableId tableId = extent.tableId(); NamespaceId namespaceId; @@ -250,7 +250,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementContinueScan(1.0D); + server.getScanMetrics().incrementContinueScan(); if (scanSession.nextBatchTask == null) { scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag); @@ -284,7 +284,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { server.getSessionManager().removeSession(scanID); TabletBase tablet = scanSession.getTabletResolver().getTablet(scanSession.extent); if (busyTimeout > 0) { - server.getScanMetrics().incrementBusy(1.0D); + server.getScanMetrics().incrementBusy(); throw new ScanServerBusyException(); } else if (tablet == null || tablet.isClosed()) { throw new NotServingTabletException(scanSession.extent.toThrift()); @@ -326,7 +326,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { @Override public void closeScan(TInfo tinfo, long scanID) { - server.getScanMetrics().incrementCloseScan(1.0D); + server.getScanMetrics().incrementCloseScan(); final SingleScanSession ss = (SingleScanSession) server.getSessionManager().removeSession(scanID); @@ -378,7 +378,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { long busyTimeout) throws ThriftSecurityException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementStartScan(1.0D); + server.getScanMetrics().incrementStartScan(); // find all of the tables that need to be scanned final HashSet<TableId> tables = new HashSet<>(); @@ -470,7 +470,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { private MultiScanResult continueMultiScan(long scanID, MultiScanSession session, long busyTimeout) throws TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementContinueScan(1.0D); + server.getScanMetrics().incrementContinueScan(); if (session.lookupTask == null) { session.lookupTask = new LookupTask(server, scanID); @@ -495,7 +495,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { } catch (CancellationException ce) { server.getSessionManager().removeSession(scanID); if (busyTimeout > 0) { - server.getScanMetrics().incrementBusy(1.0D); + server.getScanMetrics().incrementBusy(); throw new ScanServerBusyException(); } else { log.warn("Failed to get multiscan result", ce); @@ -518,7 +518,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { @Override public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { - server.getScanMetrics().incrementCloseScan(1.0D); + server.getScanMetrics().incrementCloseScan(); MultiScanSession session = (MultiScanSession) server.getSessionManager().removeSession(scanID); if (session == null) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 8cc135369e..5ca15c5c9b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -20,13 +20,14 @@ package org.apache.accumulo.tserver.metrics; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.metrics.NoOpDistributionSummary; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -37,50 +38,34 @@ public class TabletServerScanMetrics implements MetricsProducer { private Timer scans; private DistributionSummary resultsPerScan = new NoOpDistributionSummary(); private DistributionSummary yields = new NoOpDistributionSummary(); - private Counter startScanCalls; - private Counter continueScanCalls; - private Counter closeScanCalls; - private Counter busyTimeoutCount; - private Counter pausedForMemory; - private Counter earlyReturnForMemory; + private final AtomicLong startScanCalls = new AtomicLong(0); + private final AtomicLong continueScanCalls = new AtomicLong(0); + private final AtomicLong closeScanCalls = new AtomicLong(0); + private final AtomicLong busyTimeoutCount = new AtomicLong(0); + private final AtomicLong pausedForMemory = new AtomicLong(0); + private final AtomicLong earlyReturnForMemory = new AtomicLong(0); private final LongAdder lookupCount = new LongAdder(); private final LongAdder queryResultCount = new LongAdder(); private final LongAdder queryResultBytes = new LongAdder(); private final LongAdder scannedCount = new LongAdder(); - public void incrementLookupCount(long amount) { - this.lookupCount.add(amount); - } - - public long getLookupCount() { - return this.lookupCount.sum(); + public void incrementLookupCount() { + this.lookupCount.increment(); } public void incrementQueryResultCount(long amount) { this.queryResultCount.add(amount); } - public long getQueryResultCount() { - return this.queryResultCount.sum(); - } - public void incrementQueryResultBytes(long amount) { this.queryResultBytes.add(amount); } - public long getQueryByteCount() { - return this.queryResultBytes.sum(); - } - public LongAdder getScannedCounter() { return this.scannedCount; } - public long getScannedCount() { - return this.scannedCount.sum(); - } - public void addScan(long value) { scans.record(Duration.ofMillis(value)); } @@ -101,28 +86,28 @@ public class TabletServerScanMetrics implements MetricsProducer { openFiles.addAndGet(delta < 0 ? delta : delta * -1); } - public void incrementStartScan(double value) { - startScanCalls.increment(value); + public void incrementStartScan() { + startScanCalls.incrementAndGet(); } - public void incrementContinueScan(double value) { - continueScanCalls.increment(value); + public void incrementContinueScan() { + continueScanCalls.incrementAndGet(); } - public void incrementCloseScan(double value) { - closeScanCalls.increment(value); + public void incrementCloseScan() { + closeScanCalls.incrementAndGet(); } - public void incrementBusy(double value) { - busyTimeoutCount.increment(value); + public void incrementBusy() { + busyTimeoutCount.incrementAndGet(); } public void incrementScanPausedForLowMemory() { - pausedForMemory.increment(); + pausedForMemory.incrementAndGet(); } public void incrementEarlyReturnForLowMemory() { - earlyReturnForMemory.increment(); + earlyReturnForMemory.incrementAndGet(); } @Override @@ -134,31 +119,28 @@ public class TabletServerScanMetrics implements MetricsProducer { .description("Results per scan").register(registry); yields = DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry); - startScanCalls = Counter.builder(METRICS_SCAN_START) + FunctionCounter.builder(METRICS_SCAN_START, this.startScanCalls, AtomicLong::get) .description("calls to start a scan / multiscan").register(registry); - continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE) + FunctionCounter.builder(METRICS_SCAN_CONTINUE, this.continueScanCalls, AtomicLong::get) .description("calls to continue a scan / multiscan").register(registry); - closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE) + FunctionCounter.builder(METRICS_SCAN_CLOSE, this.closeScanCalls, AtomicLong::get) .description("calls to close a scan / multiscan").register(registry); - busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER) + FunctionCounter + .builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER, this.busyTimeoutCount, AtomicLong::get) .description("The number of scans where a busy timeout happened").register(registry); - Gauge.builder(METRICS_SCAN_QUERIES, this, TabletServerScanMetrics::getLookupCount) + FunctionCounter.builder(METRICS_SCAN_QUERIES, this.lookupCount, LongAdder::sum) .description("Number of queries").register(registry); - Gauge - .builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this, - TabletServerScanMetrics::getQueryResultCount) - .description("Query rate (entries/sec)").register(registry); - Gauge - .builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this, - TabletServerScanMetrics::getQueryByteCount) - .description("Query rate (bytes/sec)").register(registry); - Gauge.builder(METRICS_SCAN_SCANNED_ENTRIES, this, TabletServerScanMetrics::getScannedCount) + FunctionCounter.builder(METRICS_SCAN_SCANNED_ENTRIES, this.scannedCount, LongAdder::sum) .description("Scanned rate").register(registry); - pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM) + FunctionCounter.builder(METRICS_SCAN_PAUSED_FOR_MEM, this.pausedForMemory, AtomicLong::get) .description("scan paused due to server being low on memory").register(registry); - earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM) + FunctionCounter.builder(METRICS_SCAN_RETURN_FOR_MEM, this.earlyReturnForMemory, AtomicLong::get) .description("scan returned results early due to server being low on memory") .register(registry); + Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this.queryResultCount, LongAdder::sum) + .description("Query rate (entries/sec)").register(registry); + Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this.queryResultBytes, LongAdder::sum) + .description("Query rate (bytes/sec)").register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index 09ad197b6d..6dbce396b1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -19,35 +19,36 @@ package org.apache.accumulo.tserver.metrics; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.metrics.NoOpDistributionSummary; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; public class TabletServerUpdateMetrics implements MetricsProducer { - private Counter permissionErrorsCounter; - private Counter unknownTabletErrorsCounter; - private Counter constraintViolationsCounter; + private final AtomicLong permissionErrorsCount = new AtomicLong(); + private final AtomicLong unknownTabletErrorsCount = new AtomicLong(); + private final AtomicLong constraintViolationsCount = new AtomicLong(); private Timer commitPrepStat; private Timer walogWriteTimeStat; private Timer commitTimeStat; private DistributionSummary mutationArraySizeStat = new NoOpDistributionSummary(); public void addPermissionErrors(long value) { - permissionErrorsCounter.increment(value); + permissionErrorsCount.addAndGet(value); } public void addUnknownTabletErrors(long value) { - unknownTabletErrorsCounter.increment(value); + unknownTabletErrorsCount.addAndGet(value); } public void addConstraintViolations(long value) { - constraintViolationsCounter.increment(value); + constraintViolationsCount.addAndGet(value); } public void addCommitPrep(long value) { @@ -68,10 +69,14 @@ public class TabletServerUpdateMetrics implements MetricsProducer { @Override public void registerMetrics(MeterRegistry registry) { - permissionErrorsCounter = registry.counter(METRICS_UPDATE_ERRORS, "type", "permission"); - unknownTabletErrorsCounter = registry.counter(METRICS_UPDATE_ERRORS, "type", "unknown.tablet"); - constraintViolationsCounter = - registry.counter(METRICS_UPDATE_ERRORS, "type", "constraint.violation"); + FunctionCounter.builder(METRICS_UPDATE_ERRORS, permissionErrorsCount, AtomicLong::get) + .tags("type", "permission").description("Counts permission errors").register(registry); + FunctionCounter.builder(METRICS_UPDATE_ERRORS, unknownTabletErrorsCount, AtomicLong::get) + .tags("type", "unknown.tablet").description("Counts unknown tablet errors") + .register(registry); + FunctionCounter.builder(METRICS_UPDATE_ERRORS, constraintViolationsCount, AtomicLong::get) + .tags("type", "constraint.violation").description("Counts constraint violations") + .register(registry); commitPrepStat = Timer.builder(METRICS_UPDATE_COMMIT_PREP) .description("preparing to commit mutations").register(registry); walogWriteTimeStat = Timer.builder(METRICS_UPDATE_WALOG_WRITE) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index dd23c5a12b..9b0545a5c7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -208,7 +208,7 @@ public abstract class TabletBase { try { SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource); this.lookupCount.incrementAndGet(); - this.server.getScanMetrics().incrementLookupCount(1); + this.server.getScanMetrics().incrementLookupCount(); result = lookup(iter, ranges, results, scanParams, maxResultSize); return result; } catch (IOException | RuntimeException e) {