This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d6f823c24f Improve/standardize rate limiting logic in Monitor (#4894)
d6f823c24f is described below

commit d6f823c24f4ecfcdc4a0584c72ef9911de1c6b6c
Author: Dom G. <domgargu...@apache.org>
AuthorDate: Wed Jan 15 15:45:02 2025 -0500

    Improve/standardize rate limiting logic in Monitor (#4894)
    
    * Improve/standardize rate limiting logic in monitor
---
 .../java/org/apache/accumulo/monitor/Monitor.java  | 261 +++++++++------------
 .../rest/compactions/external/ECResource.java      |   2 +-
 2 files changed, 117 insertions(+), 146 deletions(-)

diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 17ed173714..548d2a985c 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.monitor;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.net.InetAddress;
@@ -27,10 +28,10 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +76,7 @@ import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.Threads;
 import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
@@ -178,7 +180,7 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
   private Exception problemException;
   private GCStatus gcStatus;
   private Optional<HostAndPort> coordinatorHost = Optional.empty();
-  private long coordinatorCheckNanos = 0L;
+  private Timer coordinatorCheck = null;
   private CompactionCoordinatorService.Client coordinatorClient;
   private final String coordinatorMissingMsg =
       "Error getting the compaction coordinator. Check that it is running. It 
is not "
@@ -388,11 +390,10 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
       }
 
       // check for compaction coordinator host and only notify its discovery
-      Optional<HostAndPort> previousHost;
-      if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) {
-        previousHost = coordinatorHost;
+      if (coordinatorCheck == null || 
coordinatorCheck.hasElapsed(expirationTimeMinutes, MINUTES)) {
+        Optional<HostAndPort> previousHost = coordinatorHost;
         coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
-        coordinatorCheckNanos = System.nanoTime();
+        coordinatorCheck = Timer.startNew();
         if (previousHost.isEmpty() && coordinatorHost.isPresent()) {
           log.info("External Compaction Coordinator found at {}", 
coordinatorHost.orElseThrow());
         }
@@ -611,112 +612,78 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     }
   }
 
-  private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>();
-  private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>();
-  private final Map<HostAndPort,CompactionStats> allCompactions = new 
HashMap<>();
+  private final long expirationTimeMinutes = 1;
+
+  // Use Suppliers.memoizeWithExpiration() to cache the results of expensive 
fetch operations. This
+  // avoids unnecessary repeated fetches within the expiration period and 
ensures that multiple
+  // requests around the same time use the same cached data.
+  private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier =
+      Suppliers.memoizeWithExpiration(this::fetchTServerScans, 
expirationTimeMinutes, MINUTES);
+
+  private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier =
+      Suppliers.memoizeWithExpiration(this::fetchSServerScans, 
expirationTimeMinutes, MINUTES);
+
+  private final Supplier<Map<HostAndPort,CompactionStats>> compactionsSupplier 
=
+      Suppliers.memoizeWithExpiration(this::fetchCompactions, 
expirationTimeMinutes, MINUTES);
+
+  private final Supplier<ExternalCompactionInfo> compactorInfoSupplier =
+      Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, 
expirationTimeMinutes, MINUTES);
+
+  private final Supplier<ExternalCompactionsSnapshot> 
externalCompactionsSupplier =
+      Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot,
+          expirationTimeMinutes, MINUTES);
+
   private final RecentLogs recentLogs = new RecentLogs();
-  private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
-
-  private long scansFetchedNanos = System.nanoTime();
-  private long compactsFetchedNanos = System.nanoTime();
-  private long ecInfoFetchedNanos = System.nanoTime();
-  private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
-  private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
-  // When there are a large amount of external compactions running the list of 
external compactions
-  // could consume a lot of memory. The purpose of this memoizing supplier is 
to try to avoid
-  // creating the list of running external compactions in memory per web 
request. If multiple
-  // request come in around the same time they should use the same list. It is 
still possible to
-  // have multiple list in memory if one request obtains a copy and then 
another request comes in
-  // after the timeout and the supplier recomputes the list. The longer the 
timeout on the supplier
-  // is the less likely we are to have multiple list of external compactions 
in memory, however
-  // increasing the timeout will make the monitor less responsive.
-  private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot =
-      Suppliers.memoizeWithExpiration(() -> 
computeExternalCompactionsSnapshot(), fetchTimeNanos,
-          TimeUnit.NANOSECONDS);
 
   /**
-   * Fetch the active scans but only if fetchTimeNanos has elapsed.
+   * @return active tablet server scans. Values are cached and refresh after
+   *         {@link #expirationTimeMinutes}.
    */
-  public synchronized Map<HostAndPort,ScanStats> getScans() {
-    if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
-      log.info("User initiated fetch of Active TabletServer Scans");
-      fetchScans();
-    }
-    return Map.copyOf(tserverScans);
+  public Map<HostAndPort,ScanStats> getScans() {
+    return tserverScansSupplier.get();
   }
 
-  public synchronized Map<HostAndPort,ScanStats> getScanServerScans() {
-    if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
-      log.info("User initiated fetch of Active ScanServer Scans");
-      fetchScans();
-    }
-    return Map.copyOf(sserverScans);
+  /**
+   * @return active scan server scans. Values are cached and refresh after
+   *         {@link #expirationTimeMinutes}.
+   */
+  public Map<HostAndPort,ScanStats> getScanServerScans() {
+    return sserverScansSupplier.get();
   }
 
   /**
-   * Fetch the active compactions but only if fetchTimeNanos has elapsed.
+   * @return active compactions. Values are cached and refresh after {@link 
#expirationTimeMinutes}.
    */
-  public synchronized Map<HostAndPort,CompactionStats> getCompactions() {
-    if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) {
-      log.info("User initiated fetch of Active Compactions");
-      fetchCompactions();
-    }
-    return Map.copyOf(allCompactions);
+  public Map<HostAndPort,CompactionStats> getCompactions() {
+    return compactionsSupplier.get();
   }
 
-  public synchronized ExternalCompactionInfo getCompactorsInfo() {
+  /**
+   * @return external compaction information. Values are cached and refresh 
after
+   *         {@link #expirationTimeMinutes}.
+   */
+  public ExternalCompactionInfo getCompactorsInfo() {
     if (coordinatorHost.isEmpty()) {
       throw new IllegalStateException("Tried fetching from compaction 
coordinator that's missing");
     }
-    if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
-      log.info("User initiated fetch of External Compaction info");
-      Map<String,List<HostAndPort>> compactors =
-          ExternalCompactionUtil.getCompactorAddrs(getContext());
-      log.debug("Found compactors: " + compactors);
-      ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
-      ecInfo.setCompactors(compactors);
-      ecInfo.setCoordinatorHost(coordinatorHost);
-
-      ecInfoFetchedNanos = System.nanoTime();
-    }
-    return ecInfo;
+    return compactorInfoSupplier.get();
   }
 
-  private static class ExternalCompactionsSnapshot {
-    public final RunningCompactions runningCompactions;
-    public final Map<String,TExternalCompaction> ecRunningMap;
-
-    private 
ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> 
ecRunningMapOpt) {
-      this.ecRunningMap =
-          
ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap());
-      this.runningCompactions = new RunningCompactions(this.ecRunningMap);
-    }
-  }
-
-  private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
-    if (coordinatorHost.isEmpty()) {
-      throw new IllegalStateException(coordinatorMissingMsg);
-    }
-    var ccHost = coordinatorHost.orElseThrow();
-    log.info("User initiated fetch of running External Compactions from " + 
ccHost);
-    var client = getCoordinator(ccHost);
-    TExternalCompactionList running;
-    try {
-      running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
-    } catch (Exception e) {
-      throw new IllegalStateException("Unable to get running compactions from 
" + ccHost, e);
-    }
-
-    return new 
ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions()));
-  }
-
-  public RunningCompactions getRunnningCompactions() {
-    return extCompactionSnapshot.get().runningCompactions;
+  /**
+   * @return running compactions. Values are cached and refresh after
+   *         {@link #expirationTimeMinutes}.
+   */
+  public RunningCompactions getRunningCompactions() {
+    return externalCompactionsSupplier.get().runningCompactions;
   }
 
+  /**
+   * @return running compactor details. Values are cached and refresh after
+   *         {@link #expirationTimeMinutes}.
+   */
   public RunningCompactorDetails 
getRunningCompactorDetails(ExternalCompactionId ecid) {
     TExternalCompaction extCompaction =
-        extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical());
+        externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical());
     if (extCompaction == null) {
       return null;
     }
@@ -736,61 +703,36 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     return coordinatorClient;
   }
 
-  private void fetchScans() {
+  private Map<HostAndPort,ScanStats> fetchScans(Collection<String> servers) {
     ServerContext context = getContext();
-    for (String server : context.instanceOperations().getTabletServers()) {
+    Map<HostAndPort,ScanStats> scans = new HashMap<>();
+    for (String server : servers) {
       final HostAndPort parsedServer = HostAndPort.fromString(server);
-      TabletScanClientService.Client tserver = null;
+      TabletScanClientService.Client client = null;
       try {
-        tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, 
parsedServer, context);
-        List<ActiveScan> scans = tserver.getActiveScans(null, 
context.rpcCreds());
-        tserverScans.put(parsedServer, new ScanStats(scans));
-        scansFetchedNanos = System.nanoTime();
+        client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, 
parsedServer, context);
+        List<ActiveScan> activeScans = client.getActiveScans(null, 
context.rpcCreds());
+        scans.put(parsedServer, new ScanStats(activeScans));
       } catch (Exception ex) {
         log.error("Failed to get active scans from {}", server, ex);
       } finally {
-        ThriftUtil.returnClient(tserver, context);
-      }
-    }
-    // Age off old scan information
-    Iterator<Entry<HostAndPort,ScanStats>> tserverIter = 
tserverScans.entrySet().iterator();
-    // clock time used for fetched for date friendly display
-    long now = System.currentTimeMillis();
-    while (tserverIter.hasNext()) {
-      Entry<HostAndPort,ScanStats> entry = tserverIter.next();
-      if (now - entry.getValue().fetched > ageOffEntriesMillis) {
-        tserverIter.remove();
-      }
-    }
-    // Scan Servers
-    for (String server : context.instanceOperations().getScanServers()) {
-      final HostAndPort parsedServer = HostAndPort.fromString(server);
-      TabletScanClientService.Client sserver = null;
-      try {
-        sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, 
parsedServer, context);
-        List<ActiveScan> scans = sserver.getActiveScans(null, 
context.rpcCreds());
-        sserverScans.put(parsedServer, new ScanStats(scans));
-        scansFetchedNanos = System.nanoTime();
-      } catch (Exception ex) {
-        log.error("Failed to get active scans from {}", server, ex);
-      } finally {
-        ThriftUtil.returnClient(sserver, context);
-      }
-    }
-    // Age off old scan information
-    Iterator<Entry<HostAndPort,ScanStats>> sserverIter = 
sserverScans.entrySet().iterator();
-    // clock time used for fetched for date friendly display
-    now = System.currentTimeMillis();
-    while (sserverIter.hasNext()) {
-      Entry<HostAndPort,ScanStats> entry = sserverIter.next();
-      if (now - entry.getValue().fetched > ageOffEntriesMillis) {
-        sserverIter.remove();
+        ThriftUtil.returnClient(client, context);
       }
     }
+    return Collections.unmodifiableMap(scans);
   }
 
-  private void fetchCompactions() {
+  private Map<HostAndPort,ScanStats> fetchTServerScans() {
+    return fetchScans(getContext().instanceOperations().getTabletServers());
+  }
+
+  private Map<HostAndPort,ScanStats> fetchSServerScans() {
+    return fetchScans(getContext().instanceOperations().getScanServers());
+  }
+
+  private Map<HostAndPort,CompactionStats> fetchCompactions() {
     ServerContext context = getContext();
+    Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
     for (String server : context.instanceOperations().getTabletServers()) {
       final HostAndPort parsedServer = HostAndPort.fromString(server);
       Client tserver = null;
@@ -798,23 +740,52 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
         tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context);
         var compacts = tserver.getActiveCompactions(null, context.rpcCreds());
         allCompactions.put(parsedServer, new CompactionStats(compacts));
-        compactsFetchedNanos = System.nanoTime();
       } catch (Exception ex) {
         log.debug("Failed to get active compactions from {}", server, ex);
       } finally {
         ThriftUtil.returnClient(tserver, context);
       }
     }
-    // Age off old compaction information
-    var entryIter = allCompactions.entrySet().iterator();
-    // clock time used for fetched for date friendly display
-    long now = System.currentTimeMillis();
-    while (entryIter.hasNext()) {
-      var entry = entryIter.next();
-      if (now - entry.getValue().fetched > ageOffEntriesMillis) {
-        entryIter.remove();
-      }
+    return Collections.unmodifiableMap(allCompactions);
+  }
+
+  private ExternalCompactionInfo fetchCompactorsInfo() {
+    ServerContext context = getContext();
+    Map<String,List<HostAndPort>> compactors = 
ExternalCompactionUtil.getCompactorAddrs(context);
+    log.debug("Found compactors: {}", compactors);
+    ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+    ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
+    ecInfo.setCompactors(compactors);
+    ecInfo.setCoordinatorHost(coordinatorHost);
+    return ecInfo;
+  }
+
+  private static class ExternalCompactionsSnapshot {
+    public final RunningCompactions runningCompactions;
+    public final Map<String,TExternalCompaction> ecRunningMap;
+
+    private 
ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> 
ecRunningMapOpt) {
+      this.ecRunningMap =
+          
ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap());
+      this.runningCompactions = new RunningCompactions(this.ecRunningMap);
+    }
+  }
+
+  private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
+    if (coordinatorHost.isEmpty()) {
+      throw new IllegalStateException(coordinatorMissingMsg);
+    }
+    var ccHost = coordinatorHost.orElseThrow();
+    log.info("User initiated fetch of running External Compactions from " + 
ccHost);
+    var client = getCoordinator(ccHost);
+    TExternalCompactionList running;
+    try {
+      running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to get running compactions from 
" + ccHost, e);
     }
+
+    return new 
ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions()));
   }
 
   /**
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 72d54d70a4..5fcecef349 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -60,7 +60,7 @@ public class ECResource {
   @Path("running")
   @GET
   public RunningCompactions getRunning() {
-    return monitor.getRunnningCompactions();
+    return monitor.getRunningCompactions();
   }
 
   @Path("details")

Reply via email to