This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 4977f1277c Move code from Manager class to compaction coordinator (#4011) 4977f1277c is described below commit 4977f1277c1a8ecefefaa804ba40cabea3d90daf Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Dec 18 19:35:57 2023 -0500 Move code from Manager class to compaction coordinator (#4011) Moves code related to compaction coordination out of the manager class and into the compaction coordinator class. Co-authored-by: Daniel Roberts <ddani...@gmail.com> --- .../java/org/apache/accumulo/manager/Manager.java | 42 +++----------- .../accumulo/manager/TabletGroupWatcher.java | 2 +- .../coordinator/CompactionCoordinator.java | 66 ++++++++++++++++------ .../coordinator}/QueueMetrics.java | 13 +++-- .../accumulo/manager/metrics/ManagerMetrics.java | 3 - 5 files changed, 66 insertions(+), 60 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b23e0a6bdb..2418dba1fe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -113,7 +113,6 @@ import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; -import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.split.Splitter; @@ -162,7 +161,6 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -538,16 +536,14 @@ public class Manager extends AbstractServer return splitter; } - private CompactionJobQueues compactionJobQueues; - - public CompactionJobQueues getCompactionQueues() { - return compactionJobQueues; - } - public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } + public CompactionCoordinator getCompactionCoordinator() { + return compactionCoordinator; + } + private class MigrationCleanupThread implements Runnable { @Override @@ -932,17 +928,13 @@ public class Manager extends AbstractServer final ServerContext context = getContext(); final String zroot = getZooKeeperRoot(); - this.compactionJobQueues = new CompactionJobQueues( - getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); - // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health // when a hot-standby // // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = - new CompactionCoordinator(context, tserverSet, security, compactionJobQueues, nextEvent); + compactionCoordinator = new CompactionCoordinator(context, tserverSet, security, nextEvent); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail ManagerClientService.Iface haProxy = @@ -950,7 +942,7 @@ public class Manager extends AbstractServer ServerAddress sa; var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, - compactionCoordinator, haProxy, getContext()); + compactionCoordinator.getThriftService(), haProxy, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, @@ -1008,10 +1000,8 @@ public class Manager extends AbstractServer Thread.currentThread().interrupt(); } - // Don't call run on the CompactionCoordinator until we have tservers. - Thread compactionCoordinatorThread = - Threads.createThread("CompactionCoordinator Thread", compactionCoordinator); - compactionCoordinatorThread.start(); + // Don't call start the CompactionCoordinator until we have tservers. + compactionCoordinator.start(); ZooReaderWriter zReaderWriter = context.getZooReaderWriter(); @@ -1163,11 +1153,6 @@ public class Manager extends AbstractServer tableInformationStatusPool.shutdownNow(); compactionCoordinator.shutdown(); - try { - compactionCoordinatorThread.join(); - } catch (InterruptedException e) { - log.error("Exception compaction coordinator thread", e); - } // Signal that we want it to stop, and wait for it to do so. if (authenticationTokenKeyManager != null) { @@ -1389,8 +1374,6 @@ public class Manager extends AbstractServer public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { - compactionCoordinator.updateTServerSet(current, deleted, added); - // if we have deleted or added tservers, then adjust our dead server list if (!deleted.isEmpty() || !added.isEmpty()) { DeadServerList obit = new DeadServerList(getContext()); @@ -1671,13 +1654,6 @@ public class Manager extends AbstractServer @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); - Gauge.builder(METRICS_MAJC_QUEUED, compactionJobQueues, CompactionJobQueues::getQueuedJobCount) - .description("Number of queued major compactions").register(registry); - Gauge - .builder(METRICS_MAJC_RUNNING, compactionCoordinator, - CompactionCoordinator::getNumRunningCompactions) - .description("Number of running major compactions").register(registry); - + compactionCoordinator.registerMetrics(registry); } - } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index a217f870b7..3d9e840af2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -534,7 +534,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { var jobs = compactionGenerator.generateJobs(tm, TabletManagementIterator.determineCompactionKinds(actions)); LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); - manager.getCompactionQueues().add(tm, jobs); + manager.getCompactionCoordinator().addJobs(tm, jobs); } // ELASITICITY_TODO the case where a planner generates compactions at time T1 for tablet diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index d020c1d662..6d2a51c46f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -82,7 +82,6 @@ import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry; import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; @@ -92,6 +91,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -109,6 +109,7 @@ import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.EventCoordinator; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; @@ -137,7 +138,11 @@ import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.MoreExecutors; -public class CompactionCoordinator implements CompactionCoordinatorService.Iface, Runnable { +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +public class CompactionCoordinator + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15); @@ -178,16 +183,21 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface private final Cache<Path,Integer> checked_tablet_dir_cache; private final DeadCompactionDetector deadCompactionDetector; + private final QueueMetrics queueMetrics; + public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, - SecurityOperation security, CompactionJobQueues jobQueues, - EventCoordinator eventCoordinator) { + SecurityOperation security, EventCoordinator eventCoordinator) { this.ctx = ctx; this.tserverSet = tservers; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; - this.jobQueues = jobQueues; this.eventCoordinator = eventCoordinator; + this.jobQueues = new CompactionJobQueues( + ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); + + this.queueMetrics = new QueueMetrics(jobQueues); + var refreshLatches = new EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class); refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1)); refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1)); @@ -219,8 +229,23 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface // At this point the manager does not have its lock so no actions should be taken yet } + private volatile Thread serviceThread = null; + + public void start() { + serviceThread = Threads.createThread("CompactionCoordinator Thread", this); + serviceThread.start(); + } + public void shutdown() { shutdown = true; + var localThread = serviceThread; + if (localThread != null) { + try { + localThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception stopping compaction coordinator thread", e); + } + } } protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { @@ -354,19 +379,6 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); } - /** - * Callback for the LiveTServerSet object to update current set of tablet servers, including ones - * that were deleted and added - * - * @param current current set of live tservers - * @param deleted set of tservers that were removed from current since last update - * @param added set of tservers that were added to current since last update - */ - public void updateTServerSet(LiveTServerSet current, Set<TServerInstance> deleted, - Set<TServerInstance> added) { - - } - public long getNumRunningCompactions() { return RUNNING_CACHE.size(); } @@ -629,6 +641,24 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides); } + @Override + public void registerMetrics(MeterRegistry registry) { + Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount) + .description("Number of queued major compactions").register(registry); + Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions) + .description("Number of running major compactions").register(registry); + + queueMetrics.registerMetrics(registry); + } + + public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + jobQueues.add(tabletMetadata, jobs); + } + + public CompactionCoordinatorService.Iface getThriftService() { + return this; + } + class RefreshWriter { private final ExternalCompactionId ecid; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java similarity index 93% rename from server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java rename to server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java index 87925ceb10..24a9ccae5d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.metrics; +package org.apache.accumulo.manager.compaction.coordinator; import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags; @@ -97,7 +97,7 @@ public class QueueMetrics implements MetricsProducer { private static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(5); - private MeterRegistry meterRegistry = null; + private volatile MeterRegistry meterRegistry = null; private final CompactionJobQueues compactionJobQueues; private final Map<CompactionExecutorId,QueueMeters> perQueueMetrics = new HashMap<>(); private Gauge queueCountMeter = null; @@ -113,11 +113,14 @@ public class QueueMetrics implements MetricsProducer { public void update() { + // read the volatile variable once so the rest of the method has consistent view + var localRegistry = meterRegistry; + if (queueCountMeter == null) { queueCountMeter = Gauge .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues, CompactionJobQueues::getQueueCount) - .description("Number of current Queues").tags(getCommonTags()).register(meterRegistry); + .description("Number of current Queues").tags(getCommonTags()).register(localRegistry); } LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds()); @@ -131,14 +134,14 @@ public class QueueMetrics implements MetricsProducer { Sets.difference(definedQueues, queuesWithMetrics); queuesWithoutMetrics.forEach(q -> { LOG.debug("update - creating meters for queue: {}", q); - perQueueMetrics.put(q, new QueueMeters(meterRegistry, q, compactionJobQueues.getQueue(q))); + perQueueMetrics.put(q, new QueueMeters(localRegistry, q, compactionJobQueues.getQueue(q))); }); SetView<CompactionExecutorId> metricsWithoutQueues = Sets.difference(queuesWithMetrics, definedQueues); metricsWithoutQueues.forEach(q -> { LOG.debug("update - removing meters for queue: {}", q); - perQueueMetrics.get(q).removeMeters(meterRegistry); + perQueueMetrics.get(q).removeMeters(localRegistry); perQueueMetrics.remove(q); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java index 0163e7bf62..7f31b0d70c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java @@ -35,7 +35,6 @@ import io.micrometer.core.instrument.MeterRegistry; public class ManagerMetrics implements MetricsProducer { private final FateMetrics fateMetrics; - private final QueueMetrics queueMetrics; private AtomicLong rootTGWErrorsGauge; private AtomicLong metadataTGWErrorsGauge; @@ -46,7 +45,6 @@ public class ManagerMetrics implements MetricsProducer { requireNonNull(conf, "Manager must not be null"); fateMetrics = new FateMetrics(manager.getContext(), conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)); - queueMetrics = new QueueMetrics(manager.getCompactionQueues()); } public void incrementTabletGroupWatcherError(DataLevel level) { @@ -68,7 +66,6 @@ public class ManagerMetrics implements MetricsProducer { @Override public void registerMetrics(MeterRegistry registry) { fateMetrics.registerMetrics(registry); - queueMetrics.registerMetrics(registry); rootTGWErrorsGauge = registry.gauge(METRICS_MANAGER_ROOT_TGW_ERRORS, MetricsUtil.getCommonTags(), new AtomicLong(0)); metadataTGWErrorsGauge = registry.gauge(METRICS_MANAGER_META_TGW_ERRORS,