This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit fabecbd856ccde226f776f6af493ae9b531338f1
Merge: 12166db314 9414041323
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Jun 18 14:31:54 2024 -0400

    Merge branch 'main' into elasticity

 .../apache/accumulo/core/util/cache/Caches.java    |   1 +
 .../util/compaction/ExternalCompactionUtil.java    |   9 +
 .../thrift/CompactionCoordinatorService.java       |  44 +-
 .../core/compaction/thrift/TNextCompactionJob.java | 511 +++++++++++++++++++++
 core/src/main/thrift/compaction-coordinator.thrift |   9 +-
 .../org/apache/accumulo/compactor/Compactor.java   |  15 +-
 .../coordinator/CompactionCoordinator.java         |  15 +-
 .../compaction/CompactionCoordinatorTest.java      |  15 +-
 8 files changed, 582 insertions(+), 37 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
index 3927f4d0c0,0000000000..5767f8b508
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
@@@ -1,108 -1,0 +1,109 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.util.cache;
 +
 +import static com.google.common.base.Suppliers.memoize;
 +
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Caffeine;
 +
 +import io.micrometer.core.instrument.MeterRegistry;
 +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter;
 +
 +public class Caches implements MetricsProducer {
 +
 +  public enum CacheName {
 +    BULK_IMPORT_FILE_LENGTHS,
 +    CLASSLOADERS,
 +    COMBINER_LOGGED_MSGS,
 +    COMPACTIONS_COMPLETED,
 +    COMPACTION_CONFIGS,
++    COMPACTOR_COUNTS,
 +    COMPACTION_DIR_CACHE,
 +    COMPACTION_DISPATCHERS,
 +    COMPACTION_SERVICE_ID,
 +    COMPACTOR_GROUP_ID,
 +    COMPRESSION_ALGORITHM,
 +    CRYPT_PASSWORDS,
 +    HOST_REGEX_BALANCER_TABLE_REGEX,
 +    INSTANCE_ID,
 +    NAMESPACE_ID,
 +    NAMESPACE_CONFIGS,
 +    PROP_CACHE,
 +    RECOVERY_MANAGER_PATH_CACHE,
 +    SCAN_SERVER_TABLET_METADATA,
 +    SERVICE_ENVIRONMENT_TABLE_CONFIGS,
 +    SPACE_AWARE_VOLUME_CHOICE,
 +    SPLITTER_FILES,
 +    SPLITTER_STARTING,
 +    SPLITTER_UNSPLITTABLE,
 +    TABLE_CONFIGS,
 +    TABLE_ID,
 +    TABLE_PARENT_CONFIGS,
 +    TABLE_ZOO_HELPER_CACHE,
 +    TSRM_FILE_LENGTHS,
 +    TINYLFU_BLOCK_CACHE,
 +    VOLUME_HDFS_CONFIGS
 +  }
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(Caches.class);
 +  private static final Supplier<Caches> CACHES = memoize(() -> new Caches());
 +
 +  public static Caches getInstance() {
 +    return CACHES.get();
 +  }
 +
 +  private MeterRegistry registry = null;
 +
 +  private Caches() {}
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    this.registry = registry;
 +  }
 +
 +  private boolean setupMicrometerMetrics(Caffeine<Object,Object> 
cacheBuilder, String name) {
 +    if (registry != null) {
 +      try {
 +        cacheBuilder.recordStats(() -> new CaffeineStatsCounter(registry, 
name));
 +        LOG.trace("Metrics enabled for {} cache.", name);
 +        return true;
 +      } catch (IllegalStateException e) {
 +        // recordStats was already called by the cacheBuilder.
 +      }
 +    }
 +    return false;
 +  }
 +
 +  public Caffeine<Object,Object> createNewBuilder(CacheName name, boolean 
emitMetricsIfEnabled) {
 +    Caffeine<Object,Object> cacheBuilder = Caffeine.newBuilder();
 +    boolean metricsConfigured = false;
 +    if (emitMetricsIfEnabled) {
 +      metricsConfigured = setupMicrometerMetrics(cacheBuilder, name.name());
 +    }
 +    LOG.trace("Caffeine builder created for {}, metrics enabled: {}", name, 
metricsConfigured);
 +    return cacheBuilder;
 +  }
 +
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 48895192bd,bbcc3f1529..b52ed9f867
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -45,6 -46,6 +45,7 @@@ import org.apache.accumulo.core.tablets
  import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.threads.ThreadPools;
++import org.apache.accumulo.core.util.time.NanoTime;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.KeeperException.NoNodeException;
@@@ -274,9 -274,10 +275,10 @@@ public class ExternalCompactionUtil 
      return runningIds;
    }
  
 -  public static int countCompactors(String queueName, ClientContext context) {
 -    long start = System.nanoTime();
 -    String queueRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + 
"/" + queueName;
 -    List<String> children = context.getZooCache().getChildren(queueRoot);
 +  public static int countCompactors(String groupName, ClientContext context) {
++    var start = NanoTime.now();
 +    String groupRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + 
"/" + groupName;
 +    List<String> children = context.getZooCache().getChildren(groupRoot);
      if (children == null) {
        return 0;
      }
@@@ -290,6 -291,13 +292,13 @@@
        }
      }
  
 -    long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
++    long elapsed = start.elapsed().toMillis();
+     if (elapsed > 100) {
 -      LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, 
queueName);
++      LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, 
groupName);
+     } else {
 -      LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, 
queueName);
++      LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, 
groupName);
+     }
+ 
      return count;
    }
  
diff --cc 
core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
index c7da84ef01,7b8af0438b..2870290607
--- 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
+++ 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
@@@ -31,7 -31,7 +31,7 @@@ public class CompactionCoordinatorServi
  
      public void 
compactionCompleted(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String externalCompactionId, 
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, 
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats stats) throws 
org.apache.thrift.TException;
  
-     public 
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException;
 -    public TNextCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String queueName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException;
++    public TNextCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException;
  
      public void 
updateCompactionStatus(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String externalCompactionId, TCompactionStatusUpdate status, long 
timestamp) throws org.apache.thrift.TException;
  
@@@ -49,7 -49,7 +49,7 @@@
  
      public void 
compactionCompleted(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String externalCompactionId, 
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, 
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats stats, 
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws 
org.apache.thrift.TException;
  
-     public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
 resultHandler) throws org.apache.thrift.TException;
 -    public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String queueName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) 
throws org.apache.thrift.TException;
++    public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) 
throws org.apache.thrift.TException;
  
      public void 
updateCompactionStatus(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String externalCompactionId, TCompactionStatusUpdate status, long 
timestamp, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) 
throws org.apache.thrift.TException;
  
@@@ -111,9 -111,9 +111,9 @@@
      }
  
      @Override
-     public 
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException
 -    public TNextCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String queueName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException
++    public TNextCompactionJob 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId) throws org.apache.thrift.TException
      {
 -      send_getCompactionJob(tinfo, credentials, queueName, compactor, 
externalCompactionId);
 +      send_getCompactionJob(tinfo, credentials, groupName, compactor, 
externalCompactionId);
        return recv_getCompactionJob();
      }
  
@@@ -328,20 -328,20 +328,20 @@@
      }
  
      @Override
-     public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
 resultHandler) throws org.apache.thrift.TException {
 -    public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String queueName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) 
throws org.apache.thrift.TException {
++    public void 
getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) 
throws org.apache.thrift.TException {
        checkReady();
 -      getCompactionJob_call method_call = new getCompactionJob_call(tinfo, 
credentials, queueName, compactor, externalCompactionId, resultHandler, this, 
___protocolFactory, ___transport);
 +      getCompactionJob_call method_call = new getCompactionJob_call(tinfo, 
credentials, groupName, compactor, externalCompactionId, resultHandler, this, 
___protocolFactory, ___transport);
        this.___currentMethod = method_call;
        ___manager.call(method_call);
      }
  
-     public static class getCompactionJob_call extends 
org.apache.thrift.async.TAsyncMethodCall<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
 {
+     public static class getCompactionJob_call extends 
org.apache.thrift.async.TAsyncMethodCall<TNextCompactionJob> {
        private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo;
        private org.apache.accumulo.core.securityImpl.thrift.TCredentials 
credentials;
 -      private java.lang.String queueName;
 +      private java.lang.String groupName;
        private java.lang.String compactor;
        private java.lang.String externalCompactionId;
-       public 
getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
 resultHandler, org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory,  [...]
 -      public 
getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String queueName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport [...]
++      public 
getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.lang.String groupName, java.lang.String compactor, java.lang.String 
externalCompactionId, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport [...]
          super(client, protocolFactory, transport, resultHandler, false);
          this.tinfo = tinfo;
          this.credentials = credentials;
@@@ -955,8 -955,8 +955,8 @@@
        }
  
        @Override
-       public void start(I iface, getCompactionJob_args args, 
org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob>
 resultHandler) throws org.apache.thrift.TException {
+       public void start(I iface, getCompactionJob_args args, 
org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) 
throws org.apache.thrift.TException {
 -        iface.getCompactionJob(args.tinfo, args.credentials, args.queueName, 
args.compactor, args.externalCompactionId,resultHandler);
 +        iface.getCompactionJob(args.tinfo, args.credentials, args.groupName, 
args.compactor, args.externalCompactionId,resultHandler);
        }
      }
  
diff --cc core/src/main/thrift/compaction-coordinator.thrift
index 8f9dbe9ce6,e5ff211393..d8d000bbd1
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@@ -77,10 -84,10 +84,10 @@@ service CompactionCoordinatorService 
    /*
     * Called by Compactor to get the next compaction job
     */
-   tabletserver.TExternalCompactionJob getCompactionJob(
+   TNextCompactionJob getCompactionJob(
      1:client.TInfo tinfo
      2:security.TCredentials credentials
 -    3:string queueName
 +    3:string groupName
      4:string compactor
      5:string externalCompactionId
    )
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index a39a6ca1c7,57f9f87978..5e2b0e6cd0
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -667,10 -653,8 +668,7 @@@ public class Compactor extends Abstract
      return UUID::randomUUID;
    }
  
-   protected long getWaitTimeBetweenCompactionChecks() {
-     // get the total number of compactors assigned to this group
-     int numCompactors =
-         ExternalCompactionUtil.countCompactors(this.getResourceGroup(), 
getContext());
+   protected long getWaitTimeBetweenCompactionChecks(int numCompactors) {
 -    // get the total number of compactors assigned to this queue
      long minWait = 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
      // Aim for around 3 compactors checking in per min wait time.
      long sleepTime = numCompactors * minWait / 3;
@@@ -736,20 -707,13 +734,21 @@@
          err.set(null);
          JOB_HOLDER.reset();
  
 +        if (System.currentTimeMillis() > nextSortLogsCheckTime) {
 +          // Attempt to process all existing log sorting work serially in 
this thread.
 +          // When no work remains, this call will return so that we can look 
for compaction
 +          // work.
 +          LOG.debug("Checking to see if any recovery logs need sorting");
 +          nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
 +        }
 +
          TExternalCompactionJob job;
          try {
-           job = getNextJob(getNextId());
+           TNextCompactionJob next = getNextJob(getNextId());
+           job = next.getJob();
            if (!job.isSetExternalCompactionId()) {
 -            LOG.trace("No external compactions in queue {}", this.queueName);
 +            LOG.trace("No external compactions in group {}", 
this.getResourceGroup());
-             UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks());
+             
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
              continue;
            }
            if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index f9872c3687,0000000000..355d5a8a5e
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1109 -1,0 +1,1118 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static java.util.stream.Collectors.groupingBy;
 +import static java.util.stream.Collectors.toList;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
++import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.FateKey;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.logging.TabletLogger;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import 
org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.util.time.SteadyTime;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.Manager;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational 
cache of what external
 +   * compactions may be running. Its possible it may contain external 
compactions that are not
 +   * actually running. It may not contain compactions that are actually 
running. The metadata table
 +   * is the most authoritative source of what external compactions are 
currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  /* Map of group name to last time compactor called to get a compaction job 
*/
 +  // ELASTICITY_TODO #4403 need to clean out groups that are no longer 
configured..
 +  private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final SecurityOperation security;
 +  private final String resourceGroupName;
 +  private final CompactionJobQueues jobQueues;
 +  private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
 +  // Exposed for tests
 +  protected CountDownLatch shutdown = new CountDownLatch(1);
 +
 +  private final ScheduledThreadPoolExecutor schedExecutor;
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> tabletDirCache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private QueueMetrics queueMetrics;
 +  private final Manager manager;
 +
++  private final LoadingCache<String,Integer> compactorCounts;
++
 +  public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
 +      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
 +      final String resourceGroupName, Manager manager) {
 +    this.ctx = ctx;
 +    this.schedExecutor = this.ctx.getScheduledExecutor();
 +    this.security = security;
 +    this.resourceGroupName = resourceGroupName;
 +    this.manager = Objects.requireNonNull(manager);
 +
 +    this.jobQueues = new CompactionJobQueues(
 +        
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    this.fateInstances = fateInstances;
 +
 +    completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<FateId,CompactionConfig> loader =
 +        fateId -> CompactionConfigStorage.getConfig(ctx, fateId);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config 
never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a 
time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. 
Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    tabletDirCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +        .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector =
 +        new DeadCompactionDetector(this.ctx, this, schedExecutor, 
fateInstances);
++
++    compactorCounts = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
++        .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors);
 +    // At this point the manager does not have its lock so no actions should 
be taken yet
 +  }
 +
++  protected int countCompactors(String groupName) {
++    return ExternalCompactionUtil.countCompactors(groupName, ctx);
++  }
++
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createThread("CompactionCoordinator Thread", 
this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown.countDown();
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startIdleCompactionWatcher() {
 +
 +    ScheduledFuture<?> future = 
schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
 +        getTServerCheckInterval(), getTServerCheckInterval(), 
TimeUnit.MILLISECONDS);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  private void idleCompactionWarning() {
 +
 +    long now = System.currentTimeMillis();
 +    Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
 +    TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
 +      if ((now - lastCheckTime) > getMissingCompactorWarningTime()
 +          && jobQueues.getQueuedJobs(groupName) > 0
 +          && idleCompactors.containsKey(groupName.canonical())) {
 +        LOG.warn("No compactors have checked in with coordinator for group {} 
in {}ms", groupName,
 +            getMissingCompactorWarningTime());
 +      }
 +    });
 +
 +  }
 +
 +  @Override
 +  public void run() {
 +    startCompactionCleaner(schedExecutor);
 +    startRunningCleaner(schedExecutor);
 +
 +    // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then 
resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING 
collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of 
running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage("Coordinator restarted, compaction found in 
progress");
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +
 +    startIdleCompactionWatcher();
 +
 +    try {
 +      shutdown.await();
 +    } catch (InterruptedException e) {
 +      LOG.warn("Interrupted waiting for shutdown latch.", e);
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  private Map<String,Set<HostAndPort>> getIdleCompactors() {
 +
 +    Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
 +    ExternalCompactionUtil.getCompactorAddrs(ctx)
 +        .forEach((group, compactorList) -> allCompactors.put(group, new 
HashSet<>(compactorList)));
 +
 +    Set<String> emptyQueues = new HashSet<>();
 +
 +    // Remove all of the compactors that are running a compaction
 +    RUNNING_CACHE.values().forEach(rc -> {
 +      Set<HostAndPort> busyCompactors = allCompactors.get(rc.getGroupName());
 +      if (busyCompactors != null
 +          && 
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
 +        if (busyCompactors.isEmpty()) {
 +          emptyQueues.add(rc.getGroupName());
 +        }
 +      }
 +    });
 +    // Remove entries with empty queues
 +    emptyQueues.forEach(e -> allCompactors.remove(e));
 +    return allCompactors;
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
 +    return 
this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME)
 * 3;
 +  }
 +
 +  protected long getTServerCheckInterval() {
 +    return this.ctx.getConfiguration()
 +        
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
-   public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
++  public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call 
this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    CompactorGroupId groupId = CompactorGroupId.of(groupName);
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", 
groupId, compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId);
 +
 +    while (metaJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = 
getCompactionConfig(metaJob);
 +
 +      // this method may reread the metadata, do not use the metadata in 
metaJob for anything after
 +      // this method
 +      CompactionMetadata ecm = null;
 +
 +      var kind = metaJob.getJob().getKind();
 +
 +      // Only reserve user compactions when the config is present. When 
compactions are canceled the
 +      // config is deleted.
 +      var cid = ExternalCompactionId.from(externalCompactionId);
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(metaJob, compactorAddress, cid);
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, metaJob, 
compactionConfig);
 +        // It is possible that by the time this added that the the compactor 
that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, groupName));
 +        TabletLogger.compacting(metaJob.getTabletMetadata(), cid, 
compactorAddress,
 +            metaJob.getJob());
 +        break;
 +      } else {
 +        LOG.debug(
 +            "Unable to reserve compaction job for {}, pulling another off the 
queue for group {}",
 +            metaJob.getTabletMetadata().getExtent(), groupName);
 +        metaJob = jobQueues.poll(CompactorGroupId.of(groupName));
 +      }
 +    }
 +
 +    if (metaJob == null) {
 +      LOG.debug("No jobs found in group {} ", groupName);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", groupName,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
-     return result;
- 
++    return new TNextCompactionJob(result, compactorCounts.get(groupName));
 +  }
 +
 +  @VisibleForTesting
 +  public static boolean canReserveCompaction(TabletMetadata tablet, 
CompactionKind kind,
 +      Set<StoredTabletFile> jobFiles, ServerContext ctx, SteadyTime 
steadyTime) {
 +
 +    if (tablet == null) {
 +      // the tablet no longer exist
 +      return false;
 +    }
 +
 +    if (tablet.getOperationId() != null) {
 +      return false;
 +    }
 +
 +    if (ctx.getTableState(tablet.getTableId()) != TableState.ONLINE) {
 +      return false;
 +    }
 +
 +    if (!tablet.getFiles().containsAll(jobFiles)) {
 +      return false;
 +    }
 +
 +    var currentlyCompactingFiles = 
tablet.getExternalCompactions().values().stream()
 +        .flatMap(ecm -> 
ecm.getJobFiles().stream()).collect(Collectors.toSet());
 +
 +    if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) {
 +      return false;
 +    }
 +
 +    switch (kind) {
 +      case SYSTEM:
 +        var userRequestedCompactions = 
tablet.getUserCompactionsRequested().size();
 +        if (userRequestedCompactions > 0) {
 +          LOG.debug(
 +              "Unable to reserve {} for system compaction, tablet has {} 
pending requested user compactions",
 +              tablet.getExtent(), userRequestedCompactions);
 +          return false;
 +        } else if (!Collections.disjoint(jobFiles,
 +            getFilesReservedBySelection(tablet, steadyTime, ctx))) {
 +          return false;
 +        }
 +        break;
 +      case USER:
 +        if (tablet.getSelectedFiles() == null
 +            || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) {
 +          return false;
 +        }
 +        break;
 +      default:
 +        throw new UnsupportedOperationException("Not currently handling " + 
kind);
 +    }
 +
 +    return true;
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (tabletDirCache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        tabletDirCache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  protected CompactionMetadata createExternalCompactionMetadata(CompactionJob 
job,
 +      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
 +      ExternalCompactionId externalCompactionId) {
 +    boolean propDels;
 +
 +    FateId fateId = null;
 +
 +    switch (job.getKind()) {
 +      case SYSTEM: {
 +        boolean compactingAll = tablet.getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +      }
 +        break;
 +      case USER: {
 +        boolean compactingAll = 
tablet.getSelectedFiles().initiallySelectedAll()
 +            && tablet.getSelectedFiles().getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +        fateId = tablet.getSelectedFiles().getFateId();
 +      }
 +        break;
 +      default:
 +        throw new IllegalArgumentException();
 +    }
 +
 +    Consumer<String> directoryCreator = dir -> 
checkTabletDir(tablet.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = 
TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        tablet, directoryCreator, externalCompactionId);
 +
 +    return new CompactionMetadata(jobFiles, newFile, compactorAddress, 
job.getKind(),
 +        job.getPriority(), job.getGroup(), propDels, fateId);
 +
 +  }
 +
 +  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
 +        || metaJob.getJob().getKind() == CompactionKind.USER);
 +
 +    var tabletMetadata = metaJob.getTabletMetadata();
 +
 +    var jobFiles = 
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +
 +    Retry retry = 
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
 +        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
 +        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (retry.canRetry()) {
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var extent = metaJob.getTabletMetadata().getExtent();
 +
 +        if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), 
jobFiles, ctx,
 +            manager.getSteadyTime())) {
 +          return null;
 +        }
 +
 +        var ecm = createExternalCompactionMetadata(metaJob.getJob(), 
jobFiles, tabletMetadata,
 +            compactorAddress, externalCompactionId);
 +
 +        // any data that is read from the tablet to make a decision about if 
it can compact or not
 +        // must be included in the requireSame call
 +        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 +
 +        if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
 +          var selectedFiles = tabletMetadata.getSelectedFiles();
 +          var reserved = getFilesReservedBySelection(tabletMetadata, 
manager.getSteadyTime(), ctx);
 +
 +          // If there is a selectedFiles column, and the reserved set is 
empty this means that
 +          // either no user jobs were completed yet or the selection 
expiration time has passed
 +          // so the column is eligible to be deleted so a system job can run 
instead
 +          if (selectedFiles != null && reserved.isEmpty()
 +              && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
 +            LOG.debug("Deleting user compaction selected files for {} {}", 
extent,
 +                externalCompactionId);
 +            tabletMutator.deleteSelectedFiles();
 +          }
 +        }
 +
 +        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +        tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
 +
 +        var result = tabletsMutator.process().get(extent);
 +
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          return ecm;
 +        } else {
 +          tabletMetadata = result.readMetadata();
 +        }
 +      }
 +
 +      retry.useRetry();
 +      try {
 +        retry.waitForNextAttempt(LOG,
 +            "Reserved compaction for " + 
metaJob.getTabletMetadata().getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return null;
 +  }
 +
 +  protected TExternalCompactionJob createThriftJob(String 
externalCompactionId,
 +      CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
 +    Set<CompactableFile> selectedFiles;
 +    if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
 +      selectedFiles = Set.of();
 +    } else {
 +      selectedFiles = 
metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream()
 +          .map(file -> new CompactableFileImpl(file,
 +              metaJob.getTabletMetadata().getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    }
 +
 +    Map<String,String> overrides = 
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
 +        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), 
selectedFiles);
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
 +      var dfv = 
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
 +      return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), 
dfv.getNumEntries(),
 +          dfv.getTime());
 +    }).collect(toList());
 +
 +    // The fateId here corresponds to the Fate transaction that is driving a 
user initiated
 +    // compaction. A system initiated compaction has no Fate transaction 
driving it so its ok to set
 +    // it to null. If anything tries to use the id for a system compaction 
and triggers a NPE it's
 +    // probably a bug that needs to be fixed.
 +    FateId fateId = null;
 +    if (metaJob.getJob().getKind() == CompactionKind.USER) {
 +      fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId,
 +        metaJob.getTabletMetadata().getExtent().toThrift(), files, 
iteratorSettings,
 +        ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateId == null ? null 
: fateId.toThrift(),
 +        overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, 
CompactionJobQueues::getQueuedJobCount)
 +        .tag("subprocess", "compaction.coordinator")
 +        .description("Number of queued major compactions").register(registry);
 +    Gauge.builder(METRICS_MAJC_RUNNING, this, 
CompactionCoordinator::getNumRunningCompactions)
 +        .tag("subprocess", "compaction.coordinator")
 +        .description("Number of running major 
compactions").register(registry);
 +
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
 +    jobQueues.add(tabletMetadata, jobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  private Optional<CompactionConfig> 
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
 +    if (metaJob.getJob().getKind() == CompactionKind.USER
 +        && metaJob.getTabletMetadata().getSelectedFiles() != null) {
 +      var cconf =
 +          
compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This 
method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can 
commit. Its possible that
 +   * things changed while the compaction was running and it can no longer 
commit.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets 
files or location
 +   * changed since reading the tablets metadata, then conditional mutation 
will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. 
When committing a
 +   * compaction the compacted files are removed and scan entries are added to 
the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between 
updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has 
a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if 
it has a location. This
 +   * will cause the tablet to start using the newly compacted files for 
future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using 
them.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The 
user compaction fate
 +   * operation will see the compaction was committed after this code updates 
the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not 
be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as 
part of the fate
 +   * operation so that it's known to be done before the fate operation 
returns. Since the fate
 +   * operation will do it, there is no need to do it here for user 
compactions.
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    // maybe fate has not started yet
 +    var localFates = fateInstances.get();
 +    while (localFates == null) {
 +      UtilWaitThread.sleep(100);
 +      if (shutdown.getCount() == 0) {
 +        return;
 +      }
 +      localFates = fateInstances.get();
 +    }
 +
 +    var extent = KeyExtent.fromThrift(textent);
 +    var localFate = 
localFates.get(FateInstanceType.fromTableId(extent.tableId()));
 +
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 +
 +    var tableState = manager.getContext().getTableState(extent.tableId());
 +    if (tableState != TableState.ONLINE) {
 +      // Its important this check is done after the compaction id is set in 
the metadata table to
 +      // avoid race conditions with the client code that waits for tables to 
go offline. That code
 +      // looks for compaction ids in the metadata table after setting the 
table state. When that
 +      // client code sees nothing for a tablet its important that nothing 
will changes the tablets
 +      // files after that point in time which this check ensure.
 +      LOG.debug("Not committing compaction {} for {} because of table state 
{}", ecid, extent,
 +          tableState);
 +      // cleanup metadata table and files related to the compaction
 +      compactionsFailed(Map.of(ecid, extent));
 +      return;
 +    }
 +
 +    if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    // Start a fate transaction to commit the compaction.
 +    CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
 +    var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, 
extent, ecm, stats));
 +    var txid = localFate.seedTransaction("COMMIT_COMPACTION", 
FateKey.forCompactionCommit(ecid),
 +        renameOp, true, "Commit compaction " + ecid);
 +
 +    txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} 
{}", ecid, fateId),
 +        () -> LOG.debug("compaction commit already initiated for {}", ecid));
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
 +      TKeyExtent extent) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
 +    LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, 
fromThriftExtent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +    // Need to process each level by itself because the conditional tablet 
mutator does not support
 +    // mutating multiple data levels at the same time
 +    compactions.entrySet().stream()
 +        .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()),
 +            Collectors.toMap(Entry::getKey, Entry::getValue)))
 +        .forEach((level, compactionsByLevel) -> 
compactionFailedForLevel(compactionsByLevel));
 +  }
 +
 +  void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> 
compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((ecid, extent) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
 +              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
 +
 +                @Override
 +                public boolean callWhenTabletDoesNotExists() {
 +                  return true;
 +                }
 +
 +                @Override
 +                public boolean test(TabletMetadata tabletMetadata) {
 +                  return tabletMetadata == null
 +                      || 
!tabletMetadata.getExternalCompactions().containsKey(ecid);
 +                }
 +
 +              });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector 
runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            var ecid =
 +                compactions.entrySet().stream().filter(entry -> 
entry.getValue().equals(extent))
 +                    .findFirst().map(Map.Entry::getKey).orElse(null);
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, 
ecid);
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a 
compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. 
This block is
 +          // entered when the conditional mutator above successfully deletes 
an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet 
directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          compactions.entrySet().stream().filter(e -> 
e.getValue().compareTo(extent) == 0)
 +              .map(Entry::getKey).forEach(ecidsForTablet::add);
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = 
ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + 
tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = fs.listStatus(new Path(volPath), 
(path) -> {
 +                      return path.getName().endsWith(fileSuffix);
 +                    });
 +                    if (files.length > 0) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", 
file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", 
file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for 
tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be 
due to a merge or
 +              // split operation. Use the utility to find tmp files at the 
table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.forEach((k, v) -> recordCompletion(k));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long 
timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", 
externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +    }
 +  }
 +
 +  public void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    try (TabletsMetadata tabletsMetadata =
 +        this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
 +            .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) 
{
 +      return tabletsMetadata.stream().flatMap(tm -> 
tm.getExternalCompactions().keySet().stream())
 +          .collect(Collectors.toSet());
 +    }
 +  }
 +
 +  /**
 +   * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
 +   * method periodically cleans those up.
 +   */
 +  public void cleanUpRunning() {
 +
 +    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(RUNNING_CACHE.keySet());
 +
 +    // grab the ids that are listed as running in the metadata table. It 
important that this is done
 +    // after getting the snapshot.
 +    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
 +
 +    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getRunningCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getCompletedCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
 +      throws TException {
 +    var runningCompaction = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), 
null,
 +          TableOperation.COMPACT_CANCEL, 
TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
 +  }
 +
 +  /* Method exists to be called from test */
 +  public CompactionJobQueues getJobQueues() {
 +    return jobQueues;
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, 
externalCompactionId);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race 
condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpCompactors() {
 +    final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 +
 +    var zoorw = this.ctx.getZooReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(compactorQueuesPath);
 +
 +      for (String group : groups) {
 +        String qpath = compactorQueuesPath + "/" + group;
 +
 +        var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +        }
 +
 +        for (String compactor : compactors) {
 +          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
 +          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
 +          if (lockNodes.isEmpty()) {
 +            deleteEmpty(zoorw, cpath);
 +          }
 +        }
 +      }
 +
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  private static Set<StoredTabletFile> 
getFilesReservedBySelection(TabletMetadata tabletMetadata,
 +      SteadyTime steadyTime, ServerContext ctx) {
 +    if (tabletMetadata.getSelectedFiles() == null) {
 +      return Set.of();
 +    }
 +
 +    if (tabletMetadata.getSelectedFiles().getCompletedJobs() > 0) {
 +      return tabletMetadata.getSelectedFiles().getFiles();
 +    }
 +
 +    long selectedExpirationDuration = 
ctx.getTableConfiguration(tabletMetadata.getTableId())
 +        .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION);
 +
 +    if 
(steadyTime.minus(tabletMetadata.getSelectedFiles().getSelectedTime()).toMillis()
 +        < selectedExpirationDuration) {
 +      return tabletMetadata.getSelectedFiles().getFiles();
 +    }
 +
 +    return Set.of();
 +  }
 +}
diff --cc 
server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 962ac18a13,0000000000..5c7a913d91
mode 100644,000000..100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@@ -1,606 -1,0 +1,615 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction;
 +
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;
 +import static 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator.canReserveCompaction;
 +import static org.easymock.EasyMock.anyObject;
 +import static org.easymock.EasyMock.createMock;
 +import static org.easymock.EasyMock.expect;
 +import static org.easymock.EasyMock.expectLastCall;
 +import static org.easymock.EasyMock.replay;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertNull;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.net.URI;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
++import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metrics.MetricsInfo;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.trace.TraceUtil;
 +import org.apache.accumulo.core.util.cache.Caches;
 +import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.time.SteadyTime;
 +import org.apache.accumulo.manager.Manager;
 +import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 +import 
org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 +import 
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.easymock.EasyMock;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class CompactionCoordinatorTest {
 +
 +  // Need a non-null fateInstances reference for 
CompactionCoordinator.compactionCompleted
 +  private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances =
 +      new AtomicReference<>(Map.of());
 +
 +  private static final CompactorGroupId GROUP_ID = 
CompactorGroupId.of("R2DQ");
 +
 +  private final HostAndPort tserverAddr = 
HostAndPort.fromParts("192.168.1.1", 9090);
 +
 +  public MetricsInfo getMockMetrics() {
 +    MetricsInfo metricsInfo = createMock(MetricsInfo.class);
 +    metricsInfo.addServiceTags(anyObject(), anyObject(), anyObject());
 +    expectLastCall().anyTimes();
 +    metricsInfo.addMetricsProducers(anyObject());
 +    expectLastCall().anyTimes();
 +    metricsInfo.init();
 +    expectLastCall().anyTimes();
 +    replay(metricsInfo);
 +    return metricsInfo;
 +  }
 +
 +  public class TestCoordinator extends CompactionCoordinator {
 +
 +    private final List<RunningCompaction> runningCompactions;
 +
 +    private Set<ExternalCompactionId> metadataCompactionIds = null;
 +
 +    public TestCoordinator(ServerContext ctx, SecurityOperation security,
 +        List<RunningCompaction> runningCompactions, Manager manager) {
 +      super(ctx, security, fateInstances, "TEST_GROUP", manager);
 +      this.runningCompactions = runningCompactions;
 +    }
 +
++    @Override
++    protected int countCompactors(String groupName) {
++      return 3;
++    }
++
 +    @Override
 +    protected void startDeadCompactionDetector() {}
 +
 +    @Override
 +    protected long getTServerCheckInterval() {
 +      return 5000L;
 +    }
 +
 +    @Override
 +    protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
 +
 +    @Override
 +    protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
 +
 +    @Override
 +    protected void startIdleCompactionWatcher() {
 +      // This is called from CompactionCoordinator.run(). Counting down
 +      // the latch will exit the run method
 +      this.shutdown.countDown();
 +    }
 +
 +    @Override
 +    public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +        String externalCompactionId, TKeyExtent textent, TCompactionStats 
stats)
 +        throws ThriftSecurityException {}
 +
 +    @Override
 +    public void compactionFailed(TInfo tinfo, TCredentials credentials, 
String externalCompactionId,
 +        TKeyExtent extent) throws ThriftSecurityException {}
 +
 +    void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
 +      metadataCompactionIds = mci;
 +    }
 +
 +    @Override
 +    protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +      if (metadataCompactionIds == null) {
 +        return RUNNING_CACHE.keySet();
 +      } else {
 +        return metadataCompactionIds;
 +      }
 +    }
 +
 +    public Map<ExternalCompactionId,RunningCompaction> getRunning() {
 +      return RUNNING_CACHE;
 +    }
 +
 +    public void resetInternals() {
 +      getRunning().clear();
 +      metadataCompactionIds = null;
 +    }
 +
 +    @Override
 +    protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +      return runningCompactions;
 +    }
 +
 +    @Override
 +    protected CompactionMetadata reserveCompaction(MetaJob metaJob, String 
compactorAddress,
 +        ExternalCompactionId externalCompactionId) {
 +      return createExternalCompactionMetadata(metaJob.getJob(),
 +          
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +              .collect(Collectors.toSet()),
 +          metaJob.getTabletMetadata(), compactorAddress, 
externalCompactionId);
 +    }
 +
 +    @Override
 +    protected CompactionMetadata 
createExternalCompactionMetadata(CompactionJob job,
 +        Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
 +        ExternalCompactionId externalCompactionId) {
 +      FateInstanceType type = 
FateInstanceType.fromTableId(tablet.getExtent().tableId());
 +      FateId fateId = FateId.from(type, UUID.randomUUID());
 +      return new CompactionMetadata(jobFiles,
 +          new ReferencedTabletFile(new 
Path("file:///accumulo/tables/1/default_tablet/F00001.rf")),
 +          compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), 
true, fateId);
 +    }
 +
 +    @Override
 +    protected TExternalCompactionJob createThriftJob(String 
externalCompactionId,
 +        CompactionMetadata ecm, MetaJob metaJob, Optional<CompactionConfig> 
compactionConfig) {
 +      return new TExternalCompactionJob(externalCompactionId,
 +          metaJob.getTabletMetadata().getExtent().toThrift(), List.of(),
 +          SystemIteratorUtil.toIteratorConfig(List.of()),
 +          ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
 +          TCompactionKind.valueOf(ecm.getKind().name()),
 +          FateId
 +              
.from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()),
 +                  UUID.randomUUID())
 +              .toThrift(),
 +          Map.of());
 +    }
 +
 +    @Override
 +    protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {}
 +
 +  }
 +
 +  @Test
 +  public void testCoordinatorColdStart() throws Exception {
 +
 +    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
 +    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
 +    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 +
 +    MetricsInfo metricsInfo = getMockMetrics();
 +    expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
 +
 +    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 +
 +    Manager manager = EasyMock.createNiceMock(Manager.class);
 +    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
 +        .anyTimes();
 +
 +    EasyMock.replay(context, security, manager);
 +
 +    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(0, coordinator.getRunning().size());
 +    coordinator.run();
 +    coordinator.shutdown();
 +
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(0, coordinator.getRunning().size());
 +    EasyMock.verify(context, security, metricsInfo);
 +  }
 +
 +  @Test
 +  public void testCoordinatorRestartOneRunningCompaction() throws Exception {
 +
 +    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
 +    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
 +    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 +
 +    MetricsInfo metricsInfo = getMockMetrics();
 +    expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
 +
 +    List<RunningCompaction> runningCompactions = new ArrayList<>();
 +    ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
 +    TExternalCompactionJob job = 
EasyMock.createNiceMock(TExternalCompactionJob.class);
 +    
expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
 +    TKeyExtent extent = new TKeyExtent();
 +    extent.setTable("1".getBytes());
 +    runningCompactions.add(new RunningCompaction(job, tserverAddr.toString(), 
GROUP_ID.toString()));
 +
 +    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 +
 +    Manager manager = EasyMock.createNiceMock(Manager.class);
 +    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
 +        .anyTimes();
 +
 +    EasyMock.replay(context, job, security, manager);
 +
 +    var coordinator = new TestCoordinator(context, security, 
runningCompactions, manager);
 +    coordinator.resetInternals();
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(0, coordinator.getRunning().size());
 +    coordinator.run();
 +    coordinator.shutdown();
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(1, coordinator.getRunning().size());
 +
 +    Map<ExternalCompactionId,RunningCompaction> running = 
coordinator.getRunning();
 +    Entry<ExternalCompactionId,RunningCompaction> ecomp = 
running.entrySet().iterator().next();
 +    assertEquals(eci, ecomp.getKey());
 +    RunningCompaction rc = ecomp.getValue();
 +    assertEquals(GROUP_ID.toString(), rc.getGroupName());
 +    assertEquals(tserverAddr.toString(), rc.getCompactorAddress());
 +
 +    EasyMock.verify(context, job, security);
 +  }
 +
 +  @Test
 +  public void testGetCompactionJob() throws Exception {
 +
 +    TableConfiguration tconf = 
EasyMock.createNiceMock(TableConfiguration.class);
 +    expect(tconf.get(Property.TABLE_COMPACTION_CONFIGURER))
 +        
.andReturn(Property.TABLE_COMPACTION_CONFIGURER.getDefaultValue()).anyTimes();
 +
 +    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
 +    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
 +    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 +    
expect(context.getTableConfiguration(TableId.of("2a"))).andReturn(tconf).anyTimes();
 +
 +    MetricsInfo metricsInfo = getMockMetrics();
 +    expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
 +
 +    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
 +    expect(context.rpcCreds()).andReturn(creds).anyTimes();
 +
 +    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 +    
expect(security.canPerformSystemActions(creds)).andReturn(true).anyTimes();
 +
 +    KeyExtent ke = new KeyExtent(TableId.of("2a"), new Text("z"), new 
Text("b"));
 +    TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class);
 +    expect(tm.getExtent()).andReturn(ke).anyTimes();
 +    expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
 +    expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes();
 +    Manager manager = EasyMock.createNiceMock(Manager.class);
 +    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
 +        .anyTimes();
 +
 +    EasyMock.replay(tconf, context, creds, tm, security, manager);
 +
 +    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(0, coordinator.getRunning().size());
 +    // Use coordinator.run() to populate the internal data structures. This 
is tested in a different
 +    // test.
 +    coordinator.run();
 +    coordinator.shutdown();
 +
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(0, coordinator.getRunning().size());
 +
 +    // Add a job to the job queue
 +    CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, 
Collections.emptyList(),
 +        CompactionKind.SYSTEM, Optional.of(true));
 +    coordinator.addJobs(tm, Collections.singleton(job));
 +    CompactionJobPriorityQueue queue = 
coordinator.getJobQueues().getQueue(GROUP_ID);
 +    assertEquals(1, queue.getQueuedJobs());
 +
 +    // Get the next job
 +    ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
-     TExternalCompactionJob createdJob = coordinator.getCompactionJob(new 
TInfo(), creds,
++    TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), 
creds,
 +        GROUP_ID.toString(), "localhost:10241", eci.toString());
++    assertEquals(3, nextJob.getCompactorCount());
++    TExternalCompactionJob createdJob = nextJob.getJob();
 +    assertEquals(eci.toString(), createdJob.getExternalCompactionId());
 +    assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent()));
 +
 +    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
 +    assertEquals(1, coordinator.getRunning().size());
 +    Entry<ExternalCompactionId,RunningCompaction> entry =
 +        coordinator.getRunning().entrySet().iterator().next();
 +    assertEquals(eci.toString(), entry.getKey().toString());
 +    assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
 +    assertEquals(eci.toString(), 
entry.getValue().getJob().getExternalCompactionId());
 +
 +    EasyMock.verify(tconf, context, creds, tm, security);
 +  }
 +
 +  @Test
 +  public void testGetCompactionJobNoJobs() throws Exception {
 +
 +    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
 +    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
 +    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 +
 +    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
 +
 +    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 +    expect(security.canPerformSystemActions(creds)).andReturn(true);
 +
 +    Manager manager = EasyMock.createNiceMock(Manager.class);
 +    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
 +        .anyTimes();
 +
 +    EasyMock.replay(context, creds, security, manager);
 +
 +    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
-     TExternalCompactionJob job = 
coordinator.getCompactionJob(TraceUtil.traceInfo(), creds,
++    TNextCompactionJob nextJob = 
coordinator.getCompactionJob(TraceUtil.traceInfo(), creds,
 +        GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString());
-     assertNull(job.getExternalCompactionId());
++    assertEquals(3, nextJob.getCompactorCount());
++    assertNull(nextJob.getJob().getExternalCompactionId());
 +
 +    EasyMock.verify(context, creds, security);
 +  }
 +
 +  @Test
 +  public void testCleanUpRunning() throws Exception {
 +
 +    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
 +    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
 +    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 +
 +    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
 +
 +    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 +    Manager manager = EasyMock.createNiceMock(Manager.class);
 +    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
 +        .anyTimes();
 +
 +    EasyMock.replay(context, creds, security, manager);
 +
 +    TestCoordinator coordinator =
 +        new TestCoordinator(context, security, new ArrayList<>(), manager);
 +
 +    var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
 +    var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
 +    var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
 +
 +    coordinator.getRunning().put(ecid1, new RunningCompaction(new 
TExternalCompaction()));
 +    coordinator.getRunning().put(ecid2, new RunningCompaction(new 
TExternalCompaction()));
 +    coordinator.getRunning().put(ecid3, new RunningCompaction(new 
TExternalCompaction()));
 +
 +    coordinator.cleanUpRunning();
 +
 +    assertEquals(Set.of(ecid1, ecid2, ecid3), 
coordinator.getRunning().keySet());
 +
 +    coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
 +
 +    coordinator.cleanUpRunning();
 +
 +    assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());
 +
 +    EasyMock.verify(context, creds, security);
 +
 +  }
 +
 +  @Test
 +  public void testCanReserve() throws Exception {
 +    TableId tableId1 = TableId.of("5");
 +    TableId tableId2 = TableId.of("6");
 +
 +    var file1 = StoredTabletFile.of(new 
URI("file:///accumulo/tables/1/default_tablet/F00001.rf"));
 +    var file2 = StoredTabletFile.of(new 
URI("file:///accumulo/tables/1/default_tablet/F00002.rf"));
 +    var file3 = StoredTabletFile.of(new 
URI("file:///accumulo/tables/1/default_tablet/F00003.rf"));
 +    var file4 = StoredTabletFile.of(new 
URI("file:///accumulo/tables/1/default_tablet/F00004.rf"));
 +
 +    ServerContext context = EasyMock.mock(ServerContext.class);
 +    
EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce();
 +    
EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce();
 +
 +    TableConfiguration tableConf = 
EasyMock.createMock(TableConfiguration.class);
 +    
EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION))
 +        .andReturn(100L).atLeastOnce();
 +
 +    
EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce();
 +
 +    FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
 +
 +    CompactorGroupId cgid = CompactorGroupId.of("G1");
 +    ReferencedTabletFile tmp1 =
 +        ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/1/default_tablet/C00005.rf_tmp"));
 +    CompactionMetadata cm1 = new CompactionMetadata(Set.of(file1, file2), 
tmp1, "localhost:4444",
 +        CompactionKind.SYSTEM, (short) 5, cgid, false, null);
 +
 +    ReferencedTabletFile tmp2 =
 +        ReferencedTabletFile.of(new 
Path("file:///accumulo/tables/1/default_tablet/C00006.rf_tmp"));
 +    CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, 
"localhost:5555",
 +        CompactionKind.USER, (short) 5, cgid, false, fateId1);
 +
 +    EasyMock.replay(context, tableConf);
 +
 +    KeyExtent extent1 = new KeyExtent(tableId1, null, null);
 +
 +    var dfv = new DataFileValue(1000, 100);
 +
 +    var cid1 = ExternalCompactionId.generate(UUID.randomUUID());
 +    var cid2 = ExternalCompactionId.generate(UUID.randomUUID());
 +
 +    var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), 
false, fateId1,
 +        SteadyTime.from(100, TimeUnit.SECONDS));
 +    var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), 
false, fateId1, 1,
 +        SteadyTime.from(100, TimeUnit.SECONDS));
 +
 +    var time = SteadyTime.from(1000, TimeUnit.SECONDS);
 +
 +    // should not be able to compact an offline table
 +    var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, 
null))
 +        .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, 
dfv).putFile(file4, dfv)
 +        .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED);
 +    assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, 
Set.of(file1, file2),
 +        context, time));
 +
 +    // nothing should prevent this compaction
 +    var tablet1 =
 +        TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, 
dfv).putFile(file3, dfv)
 +            .putFile(file4, dfv).build(OPID, ECOMP, 
USER_COMPACTION_REQUESTED, SELECTED);
 +    assertTrue(
 +        canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 +
 +    // should not be able to do a user compaction unless selected files are 
present
 +    assertFalse(
 +        canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, 
file2), context, time));
 +
 +    // should not be able to compact a tablet with user compaction request in 
place
 +    var tablet3 =
 +        TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, 
dfv).putFile(file3, dfv)
 +            .putFile(file4, 
dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED);
 +    assertFalse(
 +        canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 +
 +    // should not be able to compact a tablet when the job has files not 
present in the tablet
 +    var tablet4 = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
 +        .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, 
SELECTED);
 +    assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, 
Set.of(file1, file2, file4),
 +        context, time));
 +
 +    // should not be able to compact a tablet with an operation id present
 +    TabletOperationId opid = 
TabletOperationId.from(TabletOperationType.SPLITTING, fateId1);
 +    var tablet5 = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
 +        .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid)
 +        .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED);
 +    assertFalse(
 +        canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 +
 +    // should not be able to compact a tablet if the job files overlaps with 
running compactions
 +    var tablet6 = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
 +        .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, 
cm1)
 +        .putExternalCompaction(cid2, cm2).build(OPID, 
USER_COMPACTION_REQUESTED, SELECTED);
 +    assertFalse(
 +        canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 +    // should be able to compact the file that is outside of the set of files 
currently compacting
 +    assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, 
Set.of(file4), context, time));
 +
 +    // create a tablet with a selected set of files
 +    var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
 +        .putFile(file3, dfv).putFile(file4, 
dfv).putSelectedFiles(selectedWithComp)
 +        .build(OPID, USER_COMPACTION_REQUESTED, ECOMP);
 +    // 0 completed jobs
 +    var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, 
dfv)
 +        .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv)
 +        .putSelectedFiles(selectedWithoutComp).build(OPID, 
USER_COMPACTION_REQUESTED, ECOMP);
 +
 +    // Should be able to start if no completed and overlap
 +    assertTrue(canReserveCompaction(selTabletWithoutComp, 
CompactionKind.SYSTEM,
 +        Set.of(file1, file2), context, time));
 +    assertTrue(canReserveCompaction(selTabletWithoutComp, 
CompactionKind.SYSTEM,
 +        Set.of(file3, file4), context, time));
 +
 +    // should not be able to start a system compaction if the set of files 
overlaps with the
 +    // selected files
 +    assertFalse(canReserveCompaction(selTabletWithComp, 
CompactionKind.SYSTEM, Set.of(file1, file2),
 +        context, time));
 +    assertFalse(canReserveCompaction(selTabletWithComp, 
CompactionKind.SYSTEM, Set.of(file3, file4),
 +        context, time));
 +    // should be able to start a system compaction on the set of files not in 
the selected set
 +    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, 
Set.of(file4),
 +        context, time));
 +    // should be able to start user compactions on files that are selected
 +    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file1, file2),
 +        context, time));
 +    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file2, file3),
 +        context, time));
 +    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER,
 +        Set.of(file1, file2, file3), context, time));
 +    // should not be able to start user compactions on files that fall 
outside of the selected set
 +    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file1, file4),
 +        context, time));
 +    assertFalse(
 +        canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file4), context, time));
 +    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER,
 +        Set.of(file1, file2, file3, file4), context, time));
 +
 +    // test selected files and running compaction
 +    var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
 +        .putFile(file3, dfv).putFile(file4, 
dfv).putSelectedFiles(selectedWithComp)
 +        .putExternalCompaction(cid2, cm2).build(OPID, 
USER_COMPACTION_REQUESTED);
 +    // should be able to compact files that are in the selected set and not 
in the running set
 +    assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file1, file2),
 +        context, time));
 +    // should not be able to compact because files overlap the running set
 +    assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file2, file3),
 +        context, time));
 +    // should not be able to start a system compaction if the set of files 
overlaps with the
 +    // selected files and/or the running set
 +    assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file1, file2),
 +        context, time));
 +    assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file3, file4),
 +        context, time));
 +    // should be able to start a system compaction on the set of files not in 
the selected set
 +    assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file4), context,
 +        time));
 +
 +    // should not be able to compact a tablet that does not exists
 +    assertFalse(
 +        canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 +    assertFalse(
 +        canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), 
context, time));
 +
 +    EasyMock.verify(context);
 +  }
 +}

Reply via email to