This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit d5d123aaa4d601a4f5d618ad96f325a601e21293 Merge: b9867c4c8c da0e0e6624 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Mar 15 19:39:59 2024 -0400 Merge branch 'main' into elasticity .../org/apache/accumulo/compactor/Compactor.java | 44 +++++----------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 114dd59519,f4affca907..db21c2605e --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -108,8 -106,7 +108,7 @@@ import org.apache.accumulo.server.fs.Vo import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; - import org.apache.accumulo.server.security.SecurityOperation; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; +import org.apache.accumulo.tserver.log.LogSorter; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@@ -132,12 -130,10 +131,9 @@@ public class Compactor extends Abstract protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private final UUID compactorId = UUID.randomUUID(); - private final AccumuloConfiguration aconf; - private final String queueName; protected final AtomicReference<ExternalCompactionId> currentCompactionId = new AtomicReference<>(); - private final CompactionWatcher watcher; - private SecurityOperation security; private ServiceLock compactorLock; private ServerAddress compactorAddress = null; private PausedCompactionMetrics pausedMetrics; @@@ -148,31 -144,10 +144,14 @@@ private final AtomicBoolean compactionRunning = new AtomicBoolean(false); protected Compactor(ConfigOpts opts, String[] args) { - this(opts, args, null); - } - - protected Compactor(ConfigOpts opts, String[] args, AccumuloConfiguration conf) { super("compactor", opts, args); - aconf = conf == null ? super.getConfiguration() : conf; - setupSecurity(); - watcher = new CompactionWatcher(aconf); - var schedExecutor = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf); - startCancelChecker(schedExecutor, - aconf.getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); - printStartupMsg(); - queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME); + } + + @Override + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return conf.get(Property.COMPACTOR_GROUP_NAME); } - @Override - public AccumuloConfiguration getConfiguration() { - return aconf; - } - @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); @@@ -634,6 -597,11 +604,12 @@@ LOG.error("Error initializing metrics, metrics will not be emitted.", e1); } + var watcher = new CompactionWatcher(getConfiguration()); + var schedExecutor = ThreadPools.getServerThreadPools() + .createGeneralScheduledExecutorService(getConfiguration()); - startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); ++ startCancelChecker(schedExecutor, ++ getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); + LOG.info("Compactor started, waiting for work"); try {