This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit deefe4d1e68a9bc0c79fd0647cc5243238f60de0
Merge: 98cbd85f85 22831aae20
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jan 16 23:50:31 2025 +0000

    Merge branch '2.1' into 3.1

 .../org/apache/accumulo/core/file/rfile/RFile.java |  21 +-
 .../java/org/apache/accumulo/monitor/Monitor.java  | 260 +++++++++------------
 .../rest/compactions/external/ECResource.java      |   2 +-
 .../accumulo/shell/commands/HelpCommand.java       |   3 +-
 4 files changed, 133 insertions(+), 153 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 150fcf4757,68e2be016d..8737c67241
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@@ -1305,9 -1303,13 +1309,13 @@@ public class RFile 
      }
  
      @Override
-     public void closeDeepCopies() {
+     public void closeDeepCopies() throws IOException {
+       closeDeepCopies(false);
+     }
+ 
+     private void closeDeepCopies(boolean ignoreIOExceptions) throws 
IOException {
        if (deepCopy) {
 -        throw new RuntimeException("Calling closeDeepCopies on a deep copy is 
not supported");
 +        throw new IllegalStateException("Calling closeDeepCopies on a deep 
copy is not supported");
        }
  
        for (Reader deepCopy : deepCopies) {
@@@ -1320,11 -1322,12 +1328,12 @@@
      @Override
      public void close() throws IOException {
        if (deepCopy) {
 -        throw new RuntimeException("Calling close on a deep copy is not 
supported");
 +        throw new IllegalStateException("Calling close on a deep copy is not 
supported");
        }
  
-       closeDeepCopies();
-       closeLocalityGroupReaders();
+       // Closes as much as possible igoring and logging exceptions along the 
way
+       closeDeepCopies(true);
+       closeLocalityGroupReaders(true);
  
        if (sampleReaders != null) {
          for (LocalityGroupReader lgr : sampleReaders) {
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index cb09d74095,548d2a985c..2fe6e2d2b7
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -18,17 -18,21 +18,18 @@@
   */
  package org.apache.accumulo.monitor;
  
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
 -import static java.util.concurrent.TimeUnit.HOURS;
+ import static java.util.concurrent.TimeUnit.MINUTES;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
  
  import java.net.InetAddress;
  import java.net.URL;
  import java.net.UnknownHostException;
 -import java.util.ArrayList;
  import java.util.Arrays;
+ import java.util.Collection;
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
- import java.util.Iterator;
 -import java.util.LinkedList;
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
@@@ -65,12 -67,16 +66,13 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metrics.MetricsInfo;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
  import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 -import 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 +import 
org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.Pair;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
+ import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.threads.Threads;
  import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
@@@ -138,9 -176,11 +140,9 @@@ public class Monitor extends AbstractSe
  
    private final AtomicBoolean fetching = new AtomicBoolean(false);
    private ManagerMonitorInfo mmi;
 -  private Map<TableId,Map<ProblemType,Integer>> problemSummary = 
Collections.emptyMap();
 -  private Exception problemException;
    private GCStatus gcStatus;
    private Optional<HostAndPort> coordinatorHost = Optional.empty();
-   private long coordinatorCheckNanos = 0L;
+   private Timer coordinatorCheck = null;
    private CompactionCoordinatorService.Client coordinatorClient;
    private final String coordinatorMissingMsg =
        "Error getting the compaction coordinator. Check that it is running. It 
is not "
@@@ -507,45 -612,43 +508,41 @@@
      }
    }
  
-   private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>();
-   private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>();
-   private final Map<HostAndPort,CompactionStats> allCompactions = new 
HashMap<>();
-   private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
- 
-   private long scansFetchedNanos = System.nanoTime();
-   private long compactsFetchedNanos = System.nanoTime();
-   private long ecInfoFetchedNanos = System.nanoTime();
-   private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
-   private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
-   // When there are a large amount of external compactions running the list 
of external compactions
-   // could consume a lot of memory. The purpose of this memoizing supplier is 
to try to avoid
-   // creating the list of running external compactions in memory per web 
request. If multiple
-   // request come in around the same time they should use the same list. It 
is still possible to
-   // have multiple list in memory if one request obtains a copy and then 
another request comes in
-   // after the timeout and the supplier recomputes the list. The longer the 
timeout on the supplier
-   // is the less likely we are to have multiple list of external compactions 
in memory, however
-   // increasing the timeout will make the monitor less responsive.
-   private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot =
-       Suppliers.memoizeWithExpiration(() -> 
computeExternalCompactionsSnapshot(), fetchTimeNanos,
-           TimeUnit.NANOSECONDS);
+   private final long expirationTimeMinutes = 1;
+ 
+   // Use Suppliers.memoizeWithExpiration() to cache the results of expensive 
fetch operations. This
+   // avoids unnecessary repeated fetches within the expiration period and 
ensures that multiple
+   // requests around the same time use the same cached data.
+   private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier =
+       Suppliers.memoizeWithExpiration(this::fetchTServerScans, 
expirationTimeMinutes, MINUTES);
+ 
+   private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier =
+       Suppliers.memoizeWithExpiration(this::fetchSServerScans, 
expirationTimeMinutes, MINUTES);
+ 
+   private final Supplier<Map<HostAndPort,CompactionStats>> 
compactionsSupplier =
+       Suppliers.memoizeWithExpiration(this::fetchCompactions, 
expirationTimeMinutes, MINUTES);
+ 
+   private final Supplier<ExternalCompactionInfo> compactorInfoSupplier =
+       Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, 
expirationTimeMinutes, MINUTES);
+ 
+   private final Supplier<ExternalCompactionsSnapshot> 
externalCompactionsSupplier =
+       
Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot,
+           expirationTimeMinutes, MINUTES);
  
 -  private final RecentLogs recentLogs = new RecentLogs();
 -
    /**
-    * Fetch the active scans but only if fetchTimeNanos has elapsed.
+    * @return active tablet server scans. Values are cached and refresh after
+    *         {@link #expirationTimeMinutes}.
     */
-   public synchronized Map<HostAndPort,ScanStats> getScans() {
-     if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
-       log.info("User initiated fetch of Active TabletServer Scans");
-       fetchScans();
-     }
-     return Map.copyOf(tserverScans);
+   public Map<HostAndPort,ScanStats> getScans() {
+     return tserverScansSupplier.get();
    }
  
-   public synchronized Map<HostAndPort,ScanStats> getScanServerScans() {
-     if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
-       log.info("User initiated fetch of Active ScanServer Scans");
-       fetchScans();
-     }
-     return Map.copyOf(sserverScans);
+   /**
+    * @return active scan server scans. Values are cached and refresh after
+    *         {@link #expirationTimeMinutes}.
+    */
+   public Map<HostAndPort,ScanStats> getScanServerScans() {
+     return sserverScansSupplier.get();
    }
  
    /**
@@@ -700,16 -746,46 +640,46 @@@
          ThriftUtil.returnClient(tserver, context);
        }
      }
-     // Age off old compaction information
-     var entryIter = allCompactions.entrySet().iterator();
-     // clock time used for fetched for date friendly display
-     long now = System.currentTimeMillis();
-     while (entryIter.hasNext()) {
-       var entry = entryIter.next();
-       if (now - entry.getValue().fetched > ageOffEntriesMillis) {
-         entryIter.remove();
-       }
+     return Collections.unmodifiableMap(allCompactions);
+   }
+ 
+   private ExternalCompactionInfo fetchCompactorsInfo() {
+     ServerContext context = getContext();
 -    Map<String,List<HostAndPort>> compactors = 
ExternalCompactionUtil.getCompactorAddrs(context);
++    Map<String,Set<HostAndPort>> compactors = 
ExternalCompactionUtil.getCompactorAddrs(context);
+     log.debug("Found compactors: {}", compactors);
+     ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+     ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
+     ecInfo.setCompactors(compactors);
+     ecInfo.setCoordinatorHost(coordinatorHost);
+     return ecInfo;
+   }
+ 
+   private static class ExternalCompactionsSnapshot {
+     public final RunningCompactions runningCompactions;
+     public final Map<String,TExternalCompaction> ecRunningMap;
+ 
+     private 
ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> 
ecRunningMapOpt) {
+       this.ecRunningMap =
+           
ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap());
+       this.runningCompactions = new RunningCompactions(this.ecRunningMap);
+     }
+   }
+ 
+   private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
+     if (coordinatorHost.isEmpty()) {
+       throw new IllegalStateException(coordinatorMissingMsg);
      }
+     var ccHost = coordinatorHost.orElseThrow();
+     log.info("User initiated fetch of running External Compactions from " + 
ccHost);
+     var client = getCoordinator(ccHost);
+     TExternalCompactionList running;
+     try {
+       running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+     } catch (Exception e) {
+       throw new IllegalStateException("Unable to get running compactions from 
" + ccHost, e);
+     }
+ 
+     return new 
ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions()));
    }
  
    /**

Reply via email to