This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit deefe4d1e68a9bc0c79fd0647cc5243238f60de0 Merge: 98cbd85f85 22831aae20 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jan 16 23:50:31 2025 +0000 Merge branch '2.1' into 3.1 .../org/apache/accumulo/core/file/rfile/RFile.java | 21 +- .../java/org/apache/accumulo/monitor/Monitor.java | 260 +++++++++------------ .../rest/compactions/external/ECResource.java | 2 +- .../accumulo/shell/commands/HelpCommand.java | 3 +- 4 files changed, 133 insertions(+), 153 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 150fcf4757,68e2be016d..8737c67241 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@@ -1305,9 -1303,13 +1309,13 @@@ public class RFile } @Override - public void closeDeepCopies() { + public void closeDeepCopies() throws IOException { + closeDeepCopies(false); + } + + private void closeDeepCopies(boolean ignoreIOExceptions) throws IOException { if (deepCopy) { - throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported"); + throw new IllegalStateException("Calling closeDeepCopies on a deep copy is not supported"); } for (Reader deepCopy : deepCopies) { @@@ -1320,11 -1322,12 +1328,12 @@@ @Override public void close() throws IOException { if (deepCopy) { - throw new RuntimeException("Calling close on a deep copy is not supported"); + throw new IllegalStateException("Calling close on a deep copy is not supported"); } - closeDeepCopies(); - closeLocalityGroupReaders(); + // Closes as much as possible igoring and logging exceptions along the way + closeDeepCopies(true); + closeLocalityGroupReaders(true); if (sampleReaders != null) { for (LocalityGroupReader lgr : sampleReaders) { diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index cb09d74095,548d2a985c..2fe6e2d2b7 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -18,17 -18,21 +18,18 @@@ */ package org.apache.accumulo.monitor; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.HOURS; + import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Arrays; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; - import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@@ -65,12 -67,16 +66,13 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; + import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@@ -138,9 -176,11 +140,9 @@@ public class Monitor extends AbstractSe private final AtomicBoolean fetching = new AtomicBoolean(false); private ManagerMonitorInfo mmi; - private Map<TableId,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap(); - private Exception problemException; private GCStatus gcStatus; private Optional<HostAndPort> coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; + private Timer coordinatorCheck = null; private CompactionCoordinatorService.Client coordinatorClient; private final String coordinatorMissingMsg = "Error getting the compaction coordinator. Check that it is running. It is not " @@@ -507,45 -612,43 +508,41 @@@ } } - private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>(); - private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>(); - private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier<Map<HostAndPort,CompactionStats>> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier<ExternalCompactionInfo> compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier<ExternalCompactionsSnapshot> externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); - private final RecentLogs recentLogs = new RecentLogs(); - /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map<HostAndPort,ScanStats> getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map<HostAndPort,ScanStats> getScans() { + return tserverScansSupplier.get(); } - public synchronized Map<HostAndPort,ScanStats> getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map<HostAndPort,ScanStats> getScanServerScans() { + return sserverScansSupplier.get(); } /** @@@ -700,16 -746,46 +640,46 @@@ ThriftUtil.returnClient(tserver, context); } } - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + ServerContext context = getContext(); - Map<String,List<HostAndPort>> compactors = ExternalCompactionUtil.getCompactorAddrs(context); ++ Map<String,Set<HostAndPort>> compactors = ExternalCompactionUtil.getCompactorAddrs(context); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); + return ecInfo; + } + + private static class ExternalCompactionsSnapshot { + public final RunningCompactions runningCompactions; + public final Map<String,TExternalCompaction> ecRunningMap; + + private ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>> ecRunningMapOpt) { + this.ecRunningMap = + ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); + this.runningCompactions = new RunningCompactions(this.ecRunningMap); + } + } + + private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException(coordinatorMissingMsg); } + var ccHost = coordinatorHost.orElseThrow(); + log.info("User initiated fetch of running External Compactions from " + ccHost); + var client = getCoordinator(ccHost); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); + } + + return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); } /**