This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new c475dbd02f Fixup major compactions on the Monitor (#4970)
c475dbd02f is described below

commit c475dbd02f323aafdcafb2ce37872ed3de2144dc
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Oct 15 07:45:36 2024 -0400

    Fixup major compactions on the Monitor (#4970)
---
 .../java/org/apache/accumulo/monitor/Monitor.java  | 84 +++++++++++-----------
 .../monitor/rest/compactions/CompactionInfo.java   |  7 +-
 .../rest/compactions/CompactionsResource.java      |  9 +--
 .../rest/statistics/StatisticsResource.java        | 11 +++
 4 files changed, 59 insertions(+), 52 deletions(-)

diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 48ee64dc51..99855f806e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -78,7 +78,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Cl
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.Threads;
 import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
 import 
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions;
@@ -90,6 +89,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.util.TableInfoUtil;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -162,6 +162,7 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
   private final List<Pair<Long,Double>> ingestRateOverTime = newMaxList();
   private final List<Pair<Long,Double>> ingestByteRateOverTime = newMaxList();
   private final List<Pair<Long,Integer>> minorCompactionsOverTime = 
newMaxList();
+  private final List<Pair<Long,Integer>> majorCompactionsOverTime = 
newMaxList();
   private final List<Pair<Long,Double>> lookupsOverTime = newMaxList();
   private final List<Pair<Long,Long>> queryRateOverTime = newMaxList();
   private final List<Pair<Long,Long>> scanRateOverTime = newMaxList();
@@ -179,13 +180,9 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
   private Map<TableId,Map<ProblemType,Integer>> problemSummary = 
Collections.emptyMap();
   private Exception problemException;
   private GCStatus gcStatus;
-  private Optional<HostAndPort> coordinatorHost = Optional.empty();
-  private long coordinatorCheckNanos = 0L;
-  private CompactionCoordinatorService.Client coordinatorClient;
+  private volatile Optional<HostAndPort> coordinatorHost = Optional.empty();
   private final String coordinatorMissingMsg =
-      "Error getting the compaction coordinator. Check that it is running. It 
is not "
-          + "started automatically with other cluster processes so must be 
started by running "
-          + "'accumulo compaction-coordinator'.";
+      "Error getting the compaction coordinator client. Check that the Manager 
is running.";
 
   private EmbeddedWebServer server;
   private int livePort = 0;
@@ -282,11 +279,25 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
           if (client != null) {
             mmi = client.getManagerStats(TraceUtil.traceInfo(), 
context.rpcCreds());
             retry = false;
+            // Now that Manager is up, set the coordinator host
+            Set<ServerId> managers = 
context.instanceOperations().getServers(ServerId.Type.MANAGER);
+            if (managers == null || managers.isEmpty()) {
+              throw new IllegalStateException(
+                  "io.getServers returned nothing for Manager, but it's up.");
+            }
+            ServerId manager = managers.iterator().next();
+            Optional<HostAndPort> nextCoordinatorHost =
+                
Optional.of(HostAndPort.fromString(manager.toHostPortString()));
+            if (coordinatorHost.isEmpty()
+                || 
!coordinatorHost.orElseThrow().equals(nextCoordinatorHost.orElseThrow())) {
+              coordinatorHost = nextCoordinatorHost;
+            }
           } else {
             mmi = null;
             log.error("Unable to get info from Manager");
           }
           gcStatus = fetchGcStatus();
+
         } catch (Exception e) {
           mmi = null;
           log.info("Error fetching stats: ", e);
@@ -299,6 +310,7 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
           sleepUninterruptibly(1, TimeUnit.SECONDS);
         }
       }
+
       if (mmi != null) {
         int minorCompactions = 0;
 
@@ -364,6 +376,8 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
         loadOverTime.add(new Pair<>(currentTime, totalLoad));
 
         minorCompactionsOverTime.add(new Pair<>(currentTime, 
minorCompactions));
+        majorCompactionsOverTime
+            .add(new Pair<>(currentTime, 
getRunnningCompactions().running.size()));
 
         lookupsOverTime.add(new Pair<>(currentTime, 
lookupRateTracker.calculateRate()));
 
@@ -386,22 +400,7 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
         this.problemException = e;
       }
 
-      // check for compaction coordinator host and only notify its discovery
-      Optional<HostAndPort> previousHost;
-      if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) {
-        previousHost = coordinatorHost;
-        coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
-        coordinatorCheckNanos = System.nanoTime();
-        if (previousHost.isEmpty() && coordinatorHost.isPresent()) {
-          log.info("External Compaction Coordinator found at {}", 
coordinatorHost.orElseThrow());
-        }
-      }
-
     } finally {
-      if (coordinatorClient != null) {
-        ThriftUtil.returnClient(coordinatorClient, context);
-        coordinatorClient = null;
-      }
       lastRecalc.set(currentTime);
       // stop fetching; log an error if this thread wasn't already fetching
       if (!fetching.compareAndSet(true, false)) {
@@ -698,15 +697,25 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     }
     var ccHost = coordinatorHost.orElseThrow();
     log.info("User initiated fetch of running External Compactions from " + 
ccHost);
-    var client = getCoordinator(ccHost);
-    TExternalCompactionList running;
     try {
-      running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
-    } catch (Exception e) {
-      throw new IllegalStateException("Unable to get running compactions from 
" + ccHost, e);
+      CompactionCoordinatorService.Client client =
+          ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost, 
getContext());
+      TExternalCompactionList running;
+      try {
+        running = client.getRunningCompactions(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+        return new ExternalCompactionsSnapshot(
+            running.getCompactions() == null ? Map.of() : 
running.getCompactions());
+      } catch (Exception e) {
+        throw new IllegalStateException("Unable to get running compactions 
from " + ccHost, e);
+      } finally {
+        if (client != null) {
+          ThriftUtil.returnClient(client, getContext());
+        }
+      }
+    } catch (TTransportException e) {
+      log.error("Unable to get Compaction coordinator at {}", ccHost);
+      throw new IllegalStateException(coordinatorMissingMsg, e);
     }
-
-    return new ExternalCompactionsSnapshot(running.getCompactions());
   }
 
   public RunningCompactions getRunnningCompactions() {
@@ -722,19 +731,6 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     return new RunningCompactorDetails(extCompaction);
   }
 
-  private CompactionCoordinatorService.Client getCoordinator(HostAndPort 
address) {
-    if (coordinatorClient == null) {
-      try {
-        coordinatorClient =
-            ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, address, 
getContext());
-      } catch (Exception e) {
-        log.error("Unable to get Compaction coordinator at {}", address);
-        throw new IllegalStateException(coordinatorMissingMsg, e);
-      }
-    }
-    return coordinatorClient;
-  }
-
   private void fetchScans() {
     final ServerContext context = getContext();
     final Set<ServerId> servers = new HashSet<>();
@@ -998,6 +994,10 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     return new ArrayList<>(minorCompactionsOverTime);
   }
 
+  public List<Pair<Long,Integer>> getMajorCompactionsOverTime() {
+    return new ArrayList<>(majorCompactionsOverTime);
+  }
+
   public List<Pair<Long,Double>> getLookupsOverTime() {
     return new ArrayList<>(lookupsOverTime);
   }
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
index 5a71fa65e5..f222bc42ed 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
@@ -18,9 +18,10 @@
  */
 package org.apache.accumulo.monitor.rest.compactions;
 
-import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.monitor.Monitor;
 
+import com.google.common.net.HostAndPort;
+
 /**
  * Generates a compaction info JSON object
  *
@@ -40,8 +41,8 @@ public class CompactionInfo {
   /**
    * Stores new compaction information
    */
-  public CompactionInfo(TabletServerStatus tserverInfo, 
Monitor.CompactionStats stats) {
-    this.server = tserverInfo.getName();
+  public CompactionInfo(HostAndPort address, Monitor.CompactionStats stats) {
+    this.server = address.toString();
     this.fetched = stats.fetched;
     this.count = stats.count;
     this.oldest = stats.oldest;
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
index 8ceff7f971..b96f9f774d 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
@@ -27,7 +27,6 @@ import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.core.MediaType;
 
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
-import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.monitor.Monitor;
 
 import com.google.common.net.HostAndPort;
@@ -59,12 +58,8 @@ public class CompactionsResource {
 
     Map<HostAndPort,Monitor.CompactionStats> entry = monitor.getCompactions();
 
-    for (TabletServerStatus tserverInfo : mmi.getTServerInfo()) {
-      var stats = entry.get(HostAndPort.fromString(tserverInfo.name));
-      if (stats != null) {
-        compactions.addCompaction(new CompactionInfo(tserverInfo, stats));
-      }
-    }
+    entry.forEach((k, v) -> compactions.addCompaction(new CompactionInfo(k, 
v)));
+
     return compactions;
   }
 }
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
index 13573379ec..8e43e33567 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
@@ -251,6 +251,17 @@ public class StatisticsResource {
     return monitor.getMinorCompactionsOverTime();
   }
 
+  /**
+   * Generates a list with the major compactions over time
+   *
+   * @return Major compactions over time
+   */
+  @GET
+  @Path("time/majorCompactions")
+  public List<Pair<Long,Integer>> getMajorCompactions() {
+    return monitor.getMajorCompactionsOverTime();
+  }
+
   /**
    * Generates a list with the lookups over time
    *

Reply via email to