This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new f3e75f0e46 Several small improvements to Compactor impl (#4342) f3e75f0e46 is described below commit f3e75f0e46a320d10f222546c33ada896c371e7c Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Mar 15 16:53:53 2024 -0400 Several small improvements to Compactor impl (#4342) * Start threadpools when service is run, not when it is constructed (and importantly, after metrics have been fully initialized); this ensures that ServerContext, which is created during construction, is available before threadpools and metrics are started * Use the security object from the ServerContext * Remove unneeded second constructor with config overrides (can provide overridden config in test subclass instead) * Remove redundant printStartupMsg (already printed the same details in AbstractServer base class) * Make sure SuccessfulCompactor's close method is called in try-with-resources to remove unclosed resource warning in CompactorTest --- .../org/apache/accumulo/compactor/Compactor.java | 44 +++++----------------- .../apache/accumulo/compactor/CompactorTest.java | 27 ++++--------- 2 files changed, 18 insertions(+), 53 deletions(-) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 1434d2a1f1..5e09e48fdf 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -105,7 +105,6 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; -import org.apache.accumulo.server.security.SecurityOperation; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@ -142,13 +141,10 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final UUID compactorId = UUID.randomUUID(); - private final AccumuloConfiguration aconf; private final String queueName; protected final AtomicReference<ExternalCompactionId> currentCompactionId = new AtomicReference<>(); - private final CompactionWatcher watcher; - private SecurityOperation security; private ServiceLock compactorLock; private ServerAddress compactorAddress = null; @@ -158,25 +154,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac private final AtomicBoolean compactionRunning = new AtomicBoolean(false); protected Compactor(CompactorServerOpts opts, String[] args) { - this(opts, args, null); - } - - protected Compactor(CompactorServerOpts opts, String[] args, AccumuloConfiguration conf) { super("compactor", opts, args); queueName = opts.getQueueName(); - aconf = conf == null ? super.getConfiguration() : conf; - setupSecurity(); - watcher = new CompactionWatcher(aconf); - var schedExecutor = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf); - startGCLogger(schedExecutor); - startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); - printStartupMsg(); - } - - @Override - public AccumuloConfiguration getConfiguration() { - return aconf; } @Override @@ -186,10 +165,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac CompactionWatcher.setTimer(timer); } - protected void setupSecurity() { - security = getContext().getSecurityOperation(); - } - protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, @@ -245,11 +220,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } } - protected void printStartupMsg() { - LOG.info("Version " + Constants.VERSION); - LOG.info("Instance " + getContext().getInstanceID()); - } - /** * Set up nodes and locks in ZooKeeper for this Compactor * @@ -359,7 +329,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac TableId tableId = JOB_HOLDER.getTableId(); try { NamespaceId nsId = getContext().getNamespaceId(tableId); - if (!security.canCompact(credentials, tableId, nsId)) { + if (!getContext().getSecurityOperation().canCompact(credentials, tableId, nsId)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } @@ -633,6 +603,12 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac } MetricsUtil.initializeProducers(this); + var watcher = new CompactionWatcher(getConfiguration()); + var schedExecutor = ThreadPools.getServerThreadPools() + .createGeneralScheduledExecutorService(getConfiguration()); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + LOG.info("Compactor started, waiting for work"); try { @@ -831,7 +807,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { - if (!security.canPerformSystemActions(credentials)) { + if (!getContext().getSecurityOperation().canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } @@ -858,7 +834,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { // do not expect users to call this directly, expect other tservers to call this method - if (!security.canPerformSystemActions(credentials)) { + if (!getContext().getSecurityOperation().canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } @@ -883,7 +859,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac public String getRunningCompactionId(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { // do not expect users to call this directly, expect other tservers to call this method - if (!security.canPerformSystemActions(credentials)) { + if (!getContext().getSecurityOperation().canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 2b8fa20b7c..f7e979820e 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -36,7 +36,6 @@ import java.util.function.Supplier; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; @@ -176,7 +175,7 @@ public class CompactorTest { SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job, ServerContext context, ExternalCompactionId eci) { - super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, context.getConfiguration()); + super(new CompactorServerOpts(), new String[] {"-q", "testQ"}); this.uuid = uuid; this.address = address; this.job = job; @@ -184,20 +183,9 @@ public class CompactorTest { this.eci = eci; } - @Override - public AccumuloConfiguration getConfiguration() { - return context.getConfiguration(); - } - - @Override - protected void setupSecurity() {} - @Override protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {} - @Override - protected void printStartupMsg() {} - @Override public ServerContext getContext() { return this.context; @@ -469,13 +457,14 @@ public class CompactorTest { PowerMock.replayAll(); - SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context, null); - PowerMock.verifyAll(); + try (var c = new SuccessfulCompactor(null, null, null, context, null)) { + Long maxWait = c.getWaitTimeBetweenCompactionChecks(); + // compaction jitter means maxWait is between 0.9 and 1.1 of the desired value. + assertTrue(maxWait >= 720L); + assertTrue(maxWait <= 968L); + } - Long maxWait = c.getWaitTimeBetweenCompactionChecks(); - // compaction jitter means maxWait is between 0.9 and 1.1 of the desired value. - assertTrue(maxWait >= 720L); - assertTrue(maxWait <= 968L); + PowerMock.verifyAll(); } }