This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f3e75f0e46 Several small improvements to Compactor impl (#4342)
f3e75f0e46 is described below

commit f3e75f0e46a320d10f222546c33ada896c371e7c
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Fri Mar 15 16:53:53 2024 -0400

    Several small improvements to Compactor impl (#4342)
    
    * Start threadpools when service is run, not when it is constructed (and
      importantly, after metrics have been fully initialized); this ensures
      that ServerContext, which is created during construction, is available
      before threadpools and metrics are started
    * Use the security object from the ServerContext
    * Remove unneeded second constructor with config overrides (can provide
      overridden config in test subclass instead)
    * Remove redundant printStartupMsg (already printed the same details in
      AbstractServer base class)
    * Make sure SuccessfulCompactor's close method is called in
      try-with-resources to remove unclosed resource warning in
      CompactorTest
---
 .../org/apache/accumulo/compactor/Compactor.java   | 44 +++++-----------------
 .../apache/accumulo/compactor/CompactorTest.java   | 27 ++++---------
 2 files changed, 18 insertions(+), 53 deletions(-)

diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1434d2a1f1..5e09e48fdf 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -105,7 +105,6 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
-import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
@@ -142,13 +141,10 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
 
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
-  private final AccumuloConfiguration aconf;
   private final String queueName;
   protected final AtomicReference<ExternalCompactionId> currentCompactionId =
       new AtomicReference<>();
-  private final CompactionWatcher watcher;
 
-  private SecurityOperation security;
   private ServiceLock compactorLock;
   private ServerAddress compactorAddress = null;
 
@@ -158,25 +154,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
 
   protected Compactor(CompactorServerOpts opts, String[] args) {
-    this(opts, args, null);
-  }
-
-  protected Compactor(CompactorServerOpts opts, String[] args, 
AccumuloConfiguration conf) {
     super("compactor", opts, args);
     queueName = opts.getQueueName();
-    aconf = conf == null ? super.getConfiguration() : conf;
-    setupSecurity();
-    watcher = new CompactionWatcher(aconf);
-    var schedExecutor =
-        
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-    startGCLogger(schedExecutor);
-    startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
-    printStartupMsg();
-  }
-
-  @Override
-  public AccumuloConfiguration getConfiguration() {
-    return aconf;
   }
 
   @Override
@@ -186,10 +165,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     CompactionWatcher.setTimer(timer);
   }
 
-  protected void setupSecurity() {
-    security = getContext().getSecurityOperation();
-  }
-
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
     ScheduledFuture<?> future =
         schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
@@ -245,11 +220,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     }
   }
 
-  protected void printStartupMsg() {
-    LOG.info("Version " + Constants.VERSION);
-    LOG.info("Instance " + getContext().getInstanceID());
-  }
-
   /**
    * Set up nodes and locks in ZooKeeper for this Compactor
    *
@@ -359,7 +329,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     TableId tableId = JOB_HOLDER.getTableId();
     try {
       NamespaceId nsId = getContext().getNamespaceId(tableId);
-      if (!security.canCompact(credentials, tableId, nsId)) {
+      if (!getContext().getSecurityOperation().canCompact(credentials, 
tableId, nsId)) {
         throw new AccumuloSecurityException(credentials.getPrincipal(),
             SecurityErrorCode.PERMISSION_DENIED).asThriftException();
       }
@@ -633,6 +603,12 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     }
     MetricsUtil.initializeProducers(this);
 
+    var watcher = new CompactionWatcher(getConfiguration());
+    var schedExecutor = ThreadPools.getServerThreadPools()
+        .createGeneralScheduledExecutorService(getConfiguration());
+    startGCLogger(schedExecutor);
+    startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
+
     LOG.info("Compactor started, waiting for work");
     try {
 
@@ -831,7 +807,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   @Override
   public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials 
credentials)
       throws ThriftSecurityException, TException {
-    if (!security.canPerformSystemActions(credentials)) {
+    if 
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
@@ -858,7 +834,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials 
credentials)
       throws ThriftSecurityException, TException {
     // do not expect users to call this directly, expect other tservers to 
call this method
-    if (!security.canPerformSystemActions(credentials)) {
+    if 
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
@@ -883,7 +859,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   public String getRunningCompactionId(TInfo tinfo, TCredentials credentials)
       throws ThriftSecurityException, TException {
     // do not expect users to call this directly, expect other tservers to 
call this method
-    if (!security.canPerformSystemActions(credentials)) {
+    if 
(!getContext().getSecurityOperation().canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 2b8fa20b7c..f7e979820e 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -36,7 +36,6 @@ import java.util.function.Supplier;
 
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -176,7 +175,7 @@ public class CompactorTest {
 
     SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, 
TExternalCompactionJob job,
         ServerContext context, ExternalCompactionId eci) {
-      super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, 
context.getConfiguration());
+      super(new CompactorServerOpts(), new String[] {"-q", "testQ"});
       this.uuid = uuid;
       this.address = address;
       this.job = job;
@@ -184,20 +183,9 @@ public class CompactorTest {
       this.eci = eci;
     }
 
-    @Override
-    public AccumuloConfiguration getConfiguration() {
-      return context.getConfiguration();
-    }
-
-    @Override
-    protected void setupSecurity() {}
-
     @Override
     protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {}
 
-    @Override
-    protected void printStartupMsg() {}
-
     @Override
     public ServerContext getContext() {
       return this.context;
@@ -469,13 +457,14 @@ public class CompactorTest {
 
     PowerMock.replayAll();
 
-    SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context, 
null);
-    PowerMock.verifyAll();
+    try (var c = new SuccessfulCompactor(null, null, null, context, null)) {
+      Long maxWait = c.getWaitTimeBetweenCompactionChecks();
+      // compaction jitter means maxWait is between 0.9 and 1.1 of the desired 
value.
+      assertTrue(maxWait >= 720L);
+      assertTrue(maxWait <= 968L);
+    }
 
-    Long maxWait = c.getWaitTimeBetweenCompactionChecks();
-    // compaction jitter means maxWait is between 0.9 and 1.1 of the desired 
value.
-    assertTrue(maxWait >= 720L);
-    assertTrue(maxWait <= 968L);
+    PowerMock.verifyAll();
   }
 
 }

Reply via email to