This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit da0e0e6624eef8c207022636a67a41bdd848fa49
Merge: a3616bf46f f3e75f0e46
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Fri Mar 15 18:04:39 2024 -0400

    Merge branch '2.1'

 .../org/apache/accumulo/compactor/Compactor.java   | 44 +++++-----------------
 1 file changed, 10 insertions(+), 34 deletions(-)

diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index b81c30dee3,5e09e48fdf..f4affca907
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -106,8 -105,6 +106,7 @@@ import org.apache.accumulo.server.fs.Vo
  import org.apache.accumulo.server.rpc.ServerAddress;
  import org.apache.accumulo.server.rpc.TServerUtils;
  import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
- import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher;
  import org.apache.hadoop.fs.Path;
  import org.apache.thrift.TException;
  import org.apache.thrift.transport.TTransportException;
@@@ -130,42 -139,23 +129,23 @@@ public class Compactor extends Abstract
  
    protected static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
  
 -  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
    private final UUID compactorId = UUID.randomUUID();
-   private final AccumuloConfiguration aconf;
    private final String queueName;
    protected final AtomicReference<ExternalCompactionId> currentCompactionId =
        new AtomicReference<>();
-   private final CompactionWatcher watcher;
  
-   private SecurityOperation security;
    private ServiceLock compactorLock;
    private ServerAddress compactorAddress = null;
 +  private PausedCompactionMetrics pausedMetrics;
  
    // Exposed for tests
    protected volatile boolean shutdown = false;
  
    private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
  
 -  protected Compactor(CompactorServerOpts opts, String[] args) {
 +  protected Compactor(ConfigOpts opts, String[] args) {
-     this(opts, args, null);
-   }
- 
-   protected Compactor(ConfigOpts opts, String[] args, AccumuloConfiguration 
conf) {
      super("compactor", opts, args);
-     aconf = conf == null ? super.getConfiguration() : conf;
-     queueName = aconf.get(Property.COMPACTOR_QUEUE_NAME);
-     setupSecurity();
-     watcher = new CompactionWatcher(aconf);
-     var schedExecutor =
-         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-     startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
-     printStartupMsg();
-   }
- 
-   @Override
-   public AccumuloConfiguration getConfiguration() {
-     return aconf;
 -    queueName = opts.getQueueName();
++    queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME);
    }
  
    @Override
@@@ -176,10 -165,13 +156,6 @@@
      CompactionWatcher.setTimer(timer);
    }
  
-   protected void setupSecurity() {
-     security = getContext().getSecurityOperation();
 -  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
 -    ScheduledFuture<?> future =
 -        schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
 -            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
 -    ThreadPools.watchNonCriticalScheduledTask(future);
--  }
--
    protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
        long timeBetweenChecks) {
      
ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
@@@ -625,7 -601,14 +596,12 @@@
          | SecurityException e1) {
        LOG.error("Error initializing metrics, metrics will not be emitted.", 
e1);
      }
 -    MetricsUtil.initializeProducers(this);
  
+     var watcher = new CompactionWatcher(getConfiguration());
+     var schedExecutor = ThreadPools.getServerThreadPools()
+         .createGeneralScheduledExecutorService(getConfiguration());
 -    startGCLogger(schedExecutor);
+     startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
+ 
      LOG.info("Compactor started, waiting for work");
      try {
  

Reply via email to