This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit da0e0e6624eef8c207022636a67a41bdd848fa49 Merge: a3616bf46f f3e75f0e46 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Mar 15 18:04:39 2024 -0400 Merge branch '2.1' .../org/apache/accumulo/compactor/Compactor.java | 44 +++++----------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b81c30dee3,5e09e48fdf..f4affca907 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -106,8 -105,6 +106,7 @@@ import org.apache.accumulo.server.fs.Vo import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; - import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@@ -130,42 -139,23 +129,23 @@@ public class Compactor extends Abstract protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); - private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final UUID compactorId = UUID.randomUUID(); - private final AccumuloConfiguration aconf; private final String queueName; protected final AtomicReference<ExternalCompactionId> currentCompactionId = new AtomicReference<>(); - private final CompactionWatcher watcher; - private SecurityOperation security; private ServiceLock compactorLock; private ServerAddress compactorAddress = null; + private PausedCompactionMetrics pausedMetrics; // Exposed for tests protected volatile boolean shutdown = false; private final AtomicBoolean compactionRunning = new AtomicBoolean(false); - protected Compactor(CompactorServerOpts opts, String[] args) { + protected Compactor(ConfigOpts opts, String[] args) { - this(opts, args, null); - } - - protected Compactor(ConfigOpts opts, String[] args, AccumuloConfiguration conf) { super("compactor", opts, args); - aconf = conf == null ? super.getConfiguration() : conf; - queueName = aconf.get(Property.COMPACTOR_QUEUE_NAME); - setupSecurity(); - watcher = new CompactionWatcher(aconf); - var schedExecutor = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf); - startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); - printStartupMsg(); - } - - @Override - public AccumuloConfiguration getConfiguration() { - return aconf; - queueName = opts.getQueueName(); ++ queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME); } @Override @@@ -176,10 -165,13 +156,6 @@@ CompactionWatcher.setTimer(timer); } - protected void setupSecurity() { - security = getContext().getSecurityOperation(); - protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture<?> future = - schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); -- } -- protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, long timeBetweenChecks) { ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( @@@ -625,7 -601,14 +596,12 @@@ | SecurityException e1) { LOG.error("Error initializing metrics, metrics will not be emitted.", e1); } - MetricsUtil.initializeProducers(this); + var watcher = new CompactionWatcher(getConfiguration()); + var schedExecutor = ThreadPools.getServerThreadPools() + .createGeneralScheduledExecutorService(getConfiguration()); - startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + LOG.info("Compactor started, waiting for work"); try {