This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 4977f1277c Move code from Manager class to compaction coordinator 
(#4011)
4977f1277c is described below

commit 4977f1277c1a8ecefefaa804ba40cabea3d90daf
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Dec 18 19:35:57 2023 -0500

    Move code from Manager class to compaction coordinator (#4011)
    
    Moves code related to compaction coordination out of the manager class and 
into the compaction coordinator class.
    
    Co-authored-by: Daniel Roberts <ddani...@gmail.com>
---
 .../java/org/apache/accumulo/manager/Manager.java  | 42 +++-----------
 .../accumulo/manager/TabletGroupWatcher.java       |  2 +-
 .../coordinator/CompactionCoordinator.java         | 66 ++++++++++++++++------
 .../coordinator}/QueueMetrics.java                 | 13 +++--
 .../accumulo/manager/metrics/ManagerMetrics.java   |  3 -
 5 files changed, 66 insertions(+), 60 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index b23e0a6bdb..2418dba1fe 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -113,7 +113,6 @@ import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
-import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.manager.metrics.ManagerMetrics;
 import org.apache.accumulo.manager.recovery.RecoveryManager;
 import org.apache.accumulo.manager.split.Splitter;
@@ -162,7 +161,6 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -538,16 +536,14 @@ public class Manager extends AbstractServer
     return splitter;
   }
 
-  private CompactionJobQueues compactionJobQueues;
-
-  public CompactionJobQueues getCompactionQueues() {
-    return compactionJobQueues;
-  }
-
   public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
     return upgradeCoordinator.getStatus();
   }
 
+  public CompactionCoordinator getCompactionCoordinator() {
+    return compactionCoordinator;
+  }
+
   private class MigrationCleanupThread implements Runnable {
 
     @Override
@@ -932,17 +928,13 @@ public class Manager extends AbstractServer
     final ServerContext context = getContext();
     final String zroot = getZooKeeperRoot();
 
-    this.compactionJobQueues = new CompactionJobQueues(
-        
getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
-
     // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health
     // when a hot-standby
     //
     // Start the Manager's Fate Service
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
-    compactionCoordinator =
-        new CompactionCoordinator(context, tserverSet, security, 
compactionJobQueues, nextEvent);
+    compactionCoordinator = new CompactionCoordinator(context, tserverSet, 
security, nextEvent);
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
     ManagerClientService.Iface haProxy =
@@ -950,7 +942,7 @@ public class Manager extends AbstractServer
 
     ServerAddress sa;
     var processor = 
ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler,
-        compactionCoordinator, haProxy, getContext());
+        compactionCoordinator.getThriftService(), haProxy, getContext());
 
     try {
       sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
@@ -1008,10 +1000,8 @@ public class Manager extends AbstractServer
       Thread.currentThread().interrupt();
     }
 
-    // Don't call run on the CompactionCoordinator until we have tservers.
-    Thread compactionCoordinatorThread =
-        Threads.createThread("CompactionCoordinator Thread", 
compactionCoordinator);
-    compactionCoordinatorThread.start();
+    // Don't call start the CompactionCoordinator until we have tservers.
+    compactionCoordinator.start();
 
     ZooReaderWriter zReaderWriter = context.getZooReaderWriter();
 
@@ -1163,11 +1153,6 @@ public class Manager extends AbstractServer
     tableInformationStatusPool.shutdownNow();
 
     compactionCoordinator.shutdown();
-    try {
-      compactionCoordinatorThread.join();
-    } catch (InterruptedException e) {
-      log.error("Exception compaction coordinator thread", e);
-    }
 
     // Signal that we want it to stop, and wait for it to do so.
     if (authenticationTokenKeyManager != null) {
@@ -1389,8 +1374,6 @@ public class Manager extends AbstractServer
   public void update(LiveTServerSet current, Set<TServerInstance> deleted,
       Set<TServerInstance> added) {
 
-    compactionCoordinator.updateTServerSet(current, deleted, added);
-
     // if we have deleted or added tservers, then adjust our dead server list
     if (!deleted.isEmpty() || !added.isEmpty()) {
       DeadServerList obit = new DeadServerList(getContext());
@@ -1671,13 +1654,6 @@ public class Manager extends AbstractServer
   @Override
   public void registerMetrics(MeterRegistry registry) {
     super.registerMetrics(registry);
-    Gauge.builder(METRICS_MAJC_QUEUED, compactionJobQueues, 
CompactionJobQueues::getQueuedJobCount)
-        .description("Number of queued major compactions").register(registry);
-    Gauge
-        .builder(METRICS_MAJC_RUNNING, compactionCoordinator,
-            CompactionCoordinator::getNumRunningCompactions)
-        .description("Number of running major compactions").register(registry);
-
+    compactionCoordinator.registerMetrics(registry);
   }
-
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index a217f870b7..3d9e840af2 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -534,7 +534,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         var jobs = compactionGenerator.generateJobs(tm,
             TabletManagementIterator.determineCompactionKinds(actions));
         LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());
-        manager.getCompactionQueues().add(tm, jobs);
+        manager.getCompactionCoordinator().addJobs(tm, jobs);
       }
 
       // ELASITICITY_TODO the case where a planner generates compactions at 
time T1 for tablet
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index d020c1d662..6d2a51c46f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -82,7 +82,6 @@ import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
-import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
 import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
@@ -92,6 +91,7 @@ import 
org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -109,6 +109,7 @@ import 
org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.manager.EventCoordinator;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
@@ -137,7 +138,11 @@ import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.MoreExecutors;
 
-public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface, Runnable {
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class CompactionCoordinator
+    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
   private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
@@ -178,16 +183,21 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
   private final Cache<Path,Integer> checked_tablet_dir_cache;
   private final DeadCompactionDetector deadCompactionDetector;
 
+  private final QueueMetrics queueMetrics;
+
   public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
-      SecurityOperation security, CompactionJobQueues jobQueues,
-      EventCoordinator eventCoordinator) {
+      SecurityOperation security, EventCoordinator eventCoordinator) {
     this.ctx = ctx;
     this.tserverSet = tservers;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
-    this.jobQueues = jobQueues;
     this.eventCoordinator = eventCoordinator;
 
+    this.jobQueues = new CompactionJobQueues(
+        
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
+
+    this.queueMetrics = new QueueMetrics(jobQueues);
+
     var refreshLatches = new 
EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class);
     refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1));
     refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1));
@@ -219,8 +229,23 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
     // At this point the manager does not have its lock so no actions should 
be taken yet
   }
 
+  private volatile Thread serviceThread = null;
+
+  public void start() {
+    serviceThread = Threads.createThread("CompactionCoordinator Thread", this);
+    serviceThread.start();
+  }
+
   public void shutdown() {
     shutdown = true;
+    var localThread = serviceThread;
+    if (localThread != null) {
+      try {
+        localThread.join();
+      } catch (InterruptedException e) {
+        LOG.error("Exception stopping compaction coordinator thread", e);
+      }
+    }
   }
 
   protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
@@ -354,19 +379,6 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
         
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
   }
 
-  /**
-   * Callback for the LiveTServerSet object to update current set of tablet 
servers, including ones
-   * that were deleted and added
-   *
-   * @param current current set of live tservers
-   * @param deleted set of tservers that were removed from current since last 
update
-   * @param added set of tservers that were added to current since last update
-   */
-  public void updateTServerSet(LiveTServerSet current, Set<TServerInstance> 
deleted,
-      Set<TServerInstance> added) {
-
-  }
-
   public long getNumRunningCompactions() {
     return RUNNING_CACHE.size();
   }
@@ -629,6 +641,24 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
         TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides);
   }
 
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, 
CompactionJobQueues::getQueuedJobCount)
+        .description("Number of queued major compactions").register(registry);
+    Gauge.builder(METRICS_MAJC_RUNNING, this, 
CompactionCoordinator::getNumRunningCompactions)
+        .description("Number of running major compactions").register(registry);
+
+    queueMetrics.registerMetrics(registry);
+  }
+
+  public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> 
jobs) {
+    jobQueues.add(tabletMetadata, jobs);
+  }
+
+  public CompactionCoordinatorService.Iface getThriftService() {
+    return this;
+  }
+
   class RefreshWriter {
 
     private final ExternalCompactionId ecid;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
similarity index 93%
rename from 
server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
rename to 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index 87925ceb10..24a9ccae5d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.manager.metrics;
+package org.apache.accumulo.manager.compaction.coordinator;
 
 import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
 
@@ -97,7 +97,7 @@ public class QueueMetrics implements MetricsProducer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(QueueMetrics.class);
   private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(5);
-  private MeterRegistry meterRegistry = null;
+  private volatile MeterRegistry meterRegistry = null;
   private final CompactionJobQueues compactionJobQueues;
   private final Map<CompactionExecutorId,QueueMeters> perQueueMetrics = new 
HashMap<>();
   private Gauge queueCountMeter = null;
@@ -113,11 +113,14 @@ public class QueueMetrics implements MetricsProducer {
 
   public void update() {
 
+    // read the volatile variable once so the rest of the method has 
consistent view
+    var localRegistry = meterRegistry;
+
     if (queueCountMeter == null) {
       queueCountMeter = Gauge
           .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
               CompactionJobQueues::getQueueCount)
-          .description("Number of current 
Queues").tags(getCommonTags()).register(meterRegistry);
+          .description("Number of current 
Queues").tags(getCommonTags()).register(localRegistry);
     }
     LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds());
 
@@ -131,14 +134,14 @@ public class QueueMetrics implements MetricsProducer {
         Sets.difference(definedQueues, queuesWithMetrics);
     queuesWithoutMetrics.forEach(q -> {
       LOG.debug("update - creating meters for queue: {}", q);
-      perQueueMetrics.put(q, new QueueMeters(meterRegistry, q, 
compactionJobQueues.getQueue(q)));
+      perQueueMetrics.put(q, new QueueMeters(localRegistry, q, 
compactionJobQueues.getQueue(q)));
     });
 
     SetView<CompactionExecutorId> metricsWithoutQueues =
         Sets.difference(queuesWithMetrics, definedQueues);
     metricsWithoutQueues.forEach(q -> {
       LOG.debug("update - removing meters for queue: {}", q);
-      perQueueMetrics.get(q).removeMeters(meterRegistry);
+      perQueueMetrics.get(q).removeMeters(localRegistry);
       perQueueMetrics.remove(q);
     });
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index 0163e7bf62..7f31b0d70c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -35,7 +35,6 @@ import io.micrometer.core.instrument.MeterRegistry;
 public class ManagerMetrics implements MetricsProducer {
 
   private final FateMetrics fateMetrics;
-  private final QueueMetrics queueMetrics;
 
   private AtomicLong rootTGWErrorsGauge;
   private AtomicLong metadataTGWErrorsGauge;
@@ -46,7 +45,6 @@ public class ManagerMetrics implements MetricsProducer {
     requireNonNull(conf, "Manager must not be null");
     fateMetrics = new FateMetrics(manager.getContext(),
         
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL));
-    queueMetrics = new QueueMetrics(manager.getCompactionQueues());
   }
 
   public void incrementTabletGroupWatcherError(DataLevel level) {
@@ -68,7 +66,6 @@ public class ManagerMetrics implements MetricsProducer {
   @Override
   public void registerMetrics(MeterRegistry registry) {
     fateMetrics.registerMetrics(registry);
-    queueMetrics.registerMetrics(registry);
     rootTGWErrorsGauge = registry.gauge(METRICS_MANAGER_ROOT_TGW_ERRORS,
         MetricsUtil.getCommonTags(), new AtomicLong(0));
     metadataTGWErrorsGauge = registry.gauge(METRICS_MANAGER_META_TGW_ERRORS,

Reply via email to