This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new d6f823c24f Improve/standardize rate limiting logic in Monitor (#4894) d6f823c24f is described below commit d6f823c24f4ecfcdc4a0584c72ef9911de1c6b6c Author: Dom G. <domgargu...@apache.org> AuthorDate: Wed Jan 15 15:45:02 2025 -0500 Improve/standardize rate limiting logic in Monitor (#4894) * Improve/standardize rate limiting logic in monitor --- .../java/org/apache/accumulo/monitor/Monitor.java | 261 +++++++++------------ .../rest/compactions/external/ECResource.java | 2 +- 2 files changed, 117 insertions(+), 146 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 17ed173714..548d2a985c 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -20,6 +20,7 @@ package org.apache.accumulo.monitor; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.net.InetAddress; @@ -27,10 +28,10 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -75,6 +76,7 @@ import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@ -178,7 +180,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private Exception problemException; private GCStatus gcStatus; private Optional<HostAndPort> coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; + private Timer coordinatorCheck = null; private CompactionCoordinatorService.Client coordinatorClient; private final String coordinatorMissingMsg = "Error getting the compaction coordinator. Check that it is running. It is not " @@ -388,11 +390,10 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } // check for compaction coordinator host and only notify its discovery - Optional<HostAndPort> previousHost; - if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) { - previousHost = coordinatorHost; + if (coordinatorCheck == null || coordinatorCheck.hasElapsed(expirationTimeMinutes, MINUTES)) { + Optional<HostAndPort> previousHost = coordinatorHost; coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - coordinatorCheckNanos = System.nanoTime(); + coordinatorCheck = Timer.startNew(); if (previousHost.isEmpty() && coordinatorHost.isPresent()) { log.info("External Compaction Coordinator found at {}", coordinatorHost.orElseThrow()); } @@ -611,112 +612,78 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } } - private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>(); - private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>(); - private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>(); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier<Map<HostAndPort,CompactionStats>> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier<ExternalCompactionInfo> compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier<ExternalCompactionsSnapshot> externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); + private final RecentLogs recentLogs = new RecentLogs(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map<HostAndPort,ScanStats> getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map<HostAndPort,ScanStats> getScans() { + return tserverScansSupplier.get(); } - public synchronized Map<HostAndPort,ScanStats> getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map<HostAndPort,ScanStats> getScanServerScans() { + return sserverScansSupplier.get(); } /** - * Fetch the active compactions but only if fetchTimeNanos has elapsed. + * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}. */ - public synchronized Map<HostAndPort,CompactionStats> getCompactions() { - if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active Compactions"); - fetchCompactions(); - } - return Map.copyOf(allCompactions); + public Map<HostAndPort,CompactionStats> getCompactions() { + return compactionsSupplier.get(); } - public synchronized ExternalCompactionInfo getCompactorsInfo() { + /** + * @return external compaction information. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public ExternalCompactionInfo getCompactorsInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); } - if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of External Compaction info"); - Map<String,List<HostAndPort>> compactors = - ExternalCompactionUtil.getCompactorAddrs(getContext()); - log.debug("Found compactors: " + compactors); - ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); - ecInfo.setCompactors(compactors); - ecInfo.setCoordinatorHost(coordinatorHost); - - ecInfoFetchedNanos = System.nanoTime(); - } - return ecInfo; + return compactorInfoSupplier.get(); } - private static class ExternalCompactionsSnapshot { - public final RunningCompactions runningCompactions; - public final Map<String,TExternalCompaction> ecRunningMap; - - private ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> ecRunningMapOpt) { - this.ecRunningMap = - ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); - this.runningCompactions = new RunningCompactions(this.ecRunningMap); - } - } - - private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { - if (coordinatorHost.isEmpty()) { - throw new IllegalStateException(coordinatorMissingMsg); - } - var ccHost = coordinatorHost.orElseThrow(); - log.info("User initiated fetch of running External Compactions from " + ccHost); - var client = getCoordinator(ccHost); - TExternalCompactionList running; - try { - running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); - } catch (Exception e) { - throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); - } - - return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); - } - - public RunningCompactions getRunnningCompactions() { - return extCompactionSnapshot.get().runningCompactions; + /** + * @return running compactions. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactions getRunningCompactions() { + return externalCompactionsSupplier.get().runningCompactions; } + /** + * @return running compactor details. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { TExternalCompaction extCompaction = - extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); + externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical()); if (extCompaction == null) { return null; } @@ -736,61 +703,36 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { return coordinatorClient; } - private void fetchScans() { + private Map<HostAndPort,ScanStats> fetchScans(Collection<String> servers) { ServerContext context = getContext(); - for (String server : context.instanceOperations().getTabletServers()) { + Map<HostAndPort,ScanStats> scans = new HashMap<>(); + for (String server : servers) { final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client tserver = null; + TabletScanClientService.Client client = null; try { - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List<ActiveScan> scans = tserver.getActiveScans(null, context.rpcCreds()); - tserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + List<ActiveScan> activeScans = client.getActiveScans(null, context.rpcCreds()); + scans.put(parsedServer, new ScanStats(activeScans)); } catch (Exception ex) { log.error("Failed to get active scans from {}", server, ex); } finally { - ThriftUtil.returnClient(tserver, context); - } - } - // Age off old scan information - Iterator<Entry<HostAndPort,ScanStats>> tserverIter = tserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (tserverIter.hasNext()) { - Entry<HostAndPort,ScanStats> entry = tserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - tserverIter.remove(); - } - } - // Scan Servers - for (String server : context.instanceOperations().getScanServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client sserver = null; - try { - sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List<ActiveScan> scans = sserver.getActiveScans(null, context.rpcCreds()); - sserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(sserver, context); - } - } - // Age off old scan information - Iterator<Entry<HostAndPort,ScanStats>> sserverIter = sserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - now = System.currentTimeMillis(); - while (sserverIter.hasNext()) { - Entry<HostAndPort,ScanStats> entry = sserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - sserverIter.remove(); + ThriftUtil.returnClient(client, context); } } + return Collections.unmodifiableMap(scans); } - private void fetchCompactions() { + private Map<HostAndPort,ScanStats> fetchTServerScans() { + return fetchScans(getContext().instanceOperations().getTabletServers()); + } + + private Map<HostAndPort,ScanStats> fetchSServerScans() { + return fetchScans(getContext().instanceOperations().getScanServers()); + } + + private Map<HostAndPort,CompactionStats> fetchCompactions() { ServerContext context = getContext(); + Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>(); for (String server : context.instanceOperations().getTabletServers()) { final HostAndPort parsedServer = HostAndPort.fromString(server); Client tserver = null; @@ -798,23 +740,52 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); } catch (Exception ex) { log.debug("Failed to get active compactions from {}", server, ex); } finally { ThriftUtil.returnClient(tserver, context); } } - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + ServerContext context = getContext(); + Map<String,List<HostAndPort>> compactors = ExternalCompactionUtil.getCompactorAddrs(context); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); + return ecInfo; + } + + private static class ExternalCompactionsSnapshot { + public final RunningCompactions runningCompactions; + public final Map<String,TExternalCompaction> ecRunningMap; + + private ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> ecRunningMapOpt) { + this.ecRunningMap = + ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); + this.runningCompactions = new RunningCompactions(this.ecRunningMap); + } + } + + private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException(coordinatorMissingMsg); + } + var ccHost = coordinatorHost.orElseThrow(); + log.info("User initiated fetch of running External Compactions from " + ccHost); + var client = getCoordinator(ccHost); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); } + + return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); } /** diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 72d54d70a4..5fcecef349 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public class ECResource { @Path("running") @GET public RunningCompactions getRunning() { - return monitor.getRunnningCompactions(); + return monitor.getRunningCompactions(); } @Path("details")