This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new c475dbd02f Fixup major compactions on the Monitor (#4970) c475dbd02f is described below commit c475dbd02f323aafdcafb2ce37872ed3de2144dc Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Oct 15 07:45:36 2024 -0400 Fixup major compactions on the Monitor (#4970) --- .../java/org/apache/accumulo/monitor/Monitor.java | 84 +++++++++++----------- .../monitor/rest/compactions/CompactionInfo.java | 7 +- .../rest/compactions/CompactionsResource.java | 9 +-- .../rest/statistics/StatisticsResource.java | 11 +++ 4 files changed, 59 insertions(+), 52 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 48ee64dc51..99855f806e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -78,7 +78,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Cl import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions; @@ -90,6 +89,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.util.TableInfoUtil; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; @@ -162,6 +162,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private final List<Pair<Long,Double>> ingestRateOverTime = newMaxList(); private final List<Pair<Long,Double>> ingestByteRateOverTime = newMaxList(); private final List<Pair<Long,Integer>> minorCompactionsOverTime = newMaxList(); + private final List<Pair<Long,Integer>> majorCompactionsOverTime = newMaxList(); private final List<Pair<Long,Double>> lookupsOverTime = newMaxList(); private final List<Pair<Long,Long>> queryRateOverTime = newMaxList(); private final List<Pair<Long,Long>> scanRateOverTime = newMaxList(); @@ -179,13 +180,9 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private Map<TableId,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap(); private Exception problemException; private GCStatus gcStatus; - private Optional<HostAndPort> coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; - private CompactionCoordinatorService.Client coordinatorClient; + private volatile Optional<HostAndPort> coordinatorHost = Optional.empty(); private final String coordinatorMissingMsg = - "Error getting the compaction coordinator. Check that it is running. It is not " - + "started automatically with other cluster processes so must be started by running " - + "'accumulo compaction-coordinator'."; + "Error getting the compaction coordinator client. Check that the Manager is running."; private EmbeddedWebServer server; private int livePort = 0; @@ -282,11 +279,25 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { if (client != null) { mmi = client.getManagerStats(TraceUtil.traceInfo(), context.rpcCreds()); retry = false; + // Now that Manager is up, set the coordinator host + Set<ServerId> managers = context.instanceOperations().getServers(ServerId.Type.MANAGER); + if (managers == null || managers.isEmpty()) { + throw new IllegalStateException( + "io.getServers returned nothing for Manager, but it's up."); + } + ServerId manager = managers.iterator().next(); + Optional<HostAndPort> nextCoordinatorHost = + Optional.of(HostAndPort.fromString(manager.toHostPortString())); + if (coordinatorHost.isEmpty() + || !coordinatorHost.orElseThrow().equals(nextCoordinatorHost.orElseThrow())) { + coordinatorHost = nextCoordinatorHost; + } } else { mmi = null; log.error("Unable to get info from Manager"); } gcStatus = fetchGcStatus(); + } catch (Exception e) { mmi = null; log.info("Error fetching stats: ", e); @@ -299,6 +310,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { sleepUninterruptibly(1, TimeUnit.SECONDS); } } + if (mmi != null) { int minorCompactions = 0; @@ -364,6 +376,8 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { loadOverTime.add(new Pair<>(currentTime, totalLoad)); minorCompactionsOverTime.add(new Pair<>(currentTime, minorCompactions)); + majorCompactionsOverTime + .add(new Pair<>(currentTime, getRunnningCompactions().running.size())); lookupsOverTime.add(new Pair<>(currentTime, lookupRateTracker.calculateRate())); @@ -386,22 +400,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { this.problemException = e; } - // check for compaction coordinator host and only notify its discovery - Optional<HostAndPort> previousHost; - if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) { - previousHost = coordinatorHost; - coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - coordinatorCheckNanos = System.nanoTime(); - if (previousHost.isEmpty() && coordinatorHost.isPresent()) { - log.info("External Compaction Coordinator found at {}", coordinatorHost.orElseThrow()); - } - } - } finally { - if (coordinatorClient != null) { - ThriftUtil.returnClient(coordinatorClient, context); - coordinatorClient = null; - } lastRecalc.set(currentTime); // stop fetching; log an error if this thread wasn't already fetching if (!fetching.compareAndSet(true, false)) { @@ -698,15 +697,25 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } var ccHost = coordinatorHost.orElseThrow(); log.info("User initiated fetch of running External Compactions from " + ccHost); - var client = getCoordinator(ccHost); - TExternalCompactionList running; try { - running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); - } catch (Exception e) { - throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); + CompactionCoordinatorService.Client client = + ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost, getContext()); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + return new ExternalCompactionsSnapshot( + running.getCompactions() == null ? Map.of() : running.getCompactions()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); + } finally { + if (client != null) { + ThriftUtil.returnClient(client, getContext()); + } + } + } catch (TTransportException e) { + log.error("Unable to get Compaction coordinator at {}", ccHost); + throw new IllegalStateException(coordinatorMissingMsg, e); } - - return new ExternalCompactionsSnapshot(running.getCompactions()); } public RunningCompactions getRunnningCompactions() { @@ -722,19 +731,6 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { return new RunningCompactorDetails(extCompaction); } - private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) { - if (coordinatorClient == null) { - try { - coordinatorClient = - ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, address, getContext()); - } catch (Exception e) { - log.error("Unable to get Compaction coordinator at {}", address); - throw new IllegalStateException(coordinatorMissingMsg, e); - } - } - return coordinatorClient; - } - private void fetchScans() { final ServerContext context = getContext(); final Set<ServerId> servers = new HashSet<>(); @@ -998,6 +994,10 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { return new ArrayList<>(minorCompactionsOverTime); } + public List<Pair<Long,Integer>> getMajorCompactionsOverTime() { + return new ArrayList<>(majorCompactionsOverTime); + } + public List<Pair<Long,Double>> getLookupsOverTime() { return new ArrayList<>(lookupsOverTime); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java index 5a71fa65e5..f222bc42ed 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java @@ -18,9 +18,10 @@ */ package org.apache.accumulo.monitor.rest.compactions; -import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.monitor.Monitor; +import com.google.common.net.HostAndPort; + /** * Generates a compaction info JSON object * @@ -40,8 +41,8 @@ public class CompactionInfo { /** * Stores new compaction information */ - public CompactionInfo(TabletServerStatus tserverInfo, Monitor.CompactionStats stats) { - this.server = tserverInfo.getName(); + public CompactionInfo(HostAndPort address, Monitor.CompactionStats stats) { + this.server = address.toString(); this.fetched = stats.fetched; this.count = stats.count; this.oldest = stats.oldest; diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java index 8ceff7f971..b96f9f774d 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java @@ -27,7 +27,6 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; -import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.monitor.Monitor; import com.google.common.net.HostAndPort; @@ -59,12 +58,8 @@ public class CompactionsResource { Map<HostAndPort,Monitor.CompactionStats> entry = monitor.getCompactions(); - for (TabletServerStatus tserverInfo : mmi.getTServerInfo()) { - var stats = entry.get(HostAndPort.fromString(tserverInfo.name)); - if (stats != null) { - compactions.addCompaction(new CompactionInfo(tserverInfo, stats)); - } - } + entry.forEach((k, v) -> compactions.addCompaction(new CompactionInfo(k, v))); + return compactions; } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java index 13573379ec..8e43e33567 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java @@ -251,6 +251,17 @@ public class StatisticsResource { return monitor.getMinorCompactionsOverTime(); } + /** + * Generates a list with the major compactions over time + * + * @return Major compactions over time + */ + @GET + @Path("time/majorCompactions") + public List<Pair<Long,Integer>> getMajorCompactions() { + return monitor.getMajorCompactionsOverTime(); + } + /** * Generates a list with the lookups over time *