This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 6f908863f571bcb76dd77e0576ec1bd135cd0284
Merge: e7d789dc79 6dc1bcfa55
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jan 4 21:40:30 2024 -0500

    Merge branch 'main' into elasticity

 .../core/client/admin/InstanceOperations.java      |  10 +
 .../admin/compaction/CompactionConfigurer.java     |  23 +++
 .../core/clientImpl/InstanceOperationsImpl.java    |   7 +
 .../org/apache/accumulo/core/conf/Property.java    |   8 +-
 .../accumulo/core/fate/AbstractFateStore.java      |  14 +-
 .../org/apache/accumulo/core/fate/AdminUtil.java   |   5 +-
 .../org/apache/accumulo/core/fate/AgeOffStore.java |   5 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  13 +-
 .../org/apache/accumulo/core/fate/FateStore.java   |   3 +-
 .../accumulo/core/fate/WrappedFateTxStore.java     |   5 +-
 .../core/file/rfile/bcfile/PrintBCInfo.java        |  14 +-
 .../spi/compaction/DefaultCompactionPlanner.java   | 117 +++++++++++-
 .../ShellCompactCommandConfigurerTest.java         |   5 +
 .../apache/accumulo/core/fate/AgeOffStoreTest.java |  17 +-
 .../org/apache/accumulo/core/fate/TestStore.java   |   3 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 202 ++++++++++++++++++++-
 .../server/compaction/CompactionPluginUtils.java   |  16 +-
 .../coordinator/CompactionCoordinator.java         |  13 +-
 .../accumulo/test/conf/PropStoreConfigIT.java      |  41 +++++
 .../accumulo/test/functional/CompactionIT.java     | 192 ++++++++++++++++++++
 20 files changed, 666 insertions(+), 47 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b1bc08b2a4,ba0437a9fe..e568dd44d4
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -957,16 -965,13 +957,20 @@@ public enum Property 
            + " adjusting this property you may want to consider adjusting"
            + " table.compaction.major.ratio also. Setting this property to 0 
will make"
            + " it default to tserver.scan.files.open.max-1, this will prevent 
a tablet"
-           + " from having more RFiles than can be opened. Setting this 
property low may"
-           + " throttle ingest and increase query performance.",
+           + " from having more RFiles than can be opened. Prior to 2.1.0 this 
property"
+           + " was used to trigger merging minor compactions, but merging 
minor compactions"
+           + " were removed in 2.1.0. Now this property is only used by the"
+           + " DefaultCompactionStrategy and the DefaultCompactionPlanner."
+           + " The DefaultCompactionPlanner started using this property in 
2.1.3, before"
+           + " that it did not use the property.",
        "1.4.0"),
 +  TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT,
 +      "The maximum number of files that a merge operation will process.  
Before "
 +          + "merging a sum of the number of files in the merge range is 
computed and if it "
 +          + "exceeds this configuration then the merge will error and fail.  
For example if "
 +          + "there are 100 tablets each having 10 files in the merge range, 
then the sum would "
 +          + "be 1000 and the merge will only proceed if this property is 
greater than 1000.",
 +      "4.0.0"),
    TABLE_FILE_SUMMARY_MAX_SIZE("table.file.summary.maxSize", "256k", 
PropertyType.BYTES,
        "The maximum size summary that will be stored. The number of RFiles 
that"
            + " had summary data exceeding this threshold is reported by"
diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 874b58d8c6,0000000000..8c18fd378a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@@ -1,321 -1,0 +1,325 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
++import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +
 +public abstract class AbstractFateStore<T> implements FateStore<T> {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
 +
 +  protected final Set<Long> reserved;
 +  protected final Map<Long,Long> deferred;
 +
 +  // This is incremented each time a transaction was unreserved that was non 
new
 +  protected final SignalCount unreservedNonNewCount = new SignalCount();
 +
 +  // This is incremented each time a transaction is unreserved that was 
runnable
 +  protected final SignalCount unreservedRunnableCount = new SignalCount();
 +
 +  public AbstractFateStore() {
 +    this.reserved = new HashSet<>();
 +    this.deferred = new HashMap<>();
 +  }
 +
 +  public static byte[] serialize(Object o) {
 +    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
 +      oos.writeObject(o);
 +      return baos.toByteArray();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
 +      justification = "unsafe to store arbitrary serialized objects like 
this, but needed for now"
 +          + " for backwards compatibility")
 +  public static Object deserialize(byte[] ser) {
 +    try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
 +        ObjectInputStream ois = new ObjectInputStream(bais)) {
 +      return ois.readObject();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  /**
 +   * Attempt to reserve transaction
 +   *
 +   * @param tid transaction id
 +   * @return An Optional containing the FateTxStore if the transaction was 
successfully reserved, or
 +   *         an empty Optional if the transaction was already reserved.
 +   */
 +  @Override
 +  public Optional<FateTxStore<T>> tryReserve(long tid) {
 +    synchronized (this) {
 +      if (!reserved.contains(tid)) {
 +        return Optional.of(reserve(tid));
 +      }
 +      return Optional.empty();
 +    }
 +  }
 +
 +  @Override
 +  public FateTxStore<T> reserve(long tid) {
 +    synchronized (AbstractFateStore.this) {
 +      while (reserved.contains(tid)) {
 +        try {
 +          AbstractFateStore.this.wait(100);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +
 +      reserved.add(tid);
 +      return newFateTxStore(tid, true);
 +    }
 +  }
 +
 +  @Override
 +  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
 +
 +    while (keepWaiting.get()) {
 +      ArrayList<Long> runnableTids = new ArrayList<>();
 +
 +      final long beforeCount = unreservedRunnableCount.getCount();
 +
 +      List<String> transactions = getTransactions();
 +      for (String txidStr : transactions) {
 +        long txid = parseTid(txidStr);
 +        if (isRunnable(_getStatus(txid))) {
 +          runnableTids.add(txid);
 +        }
 +      }
 +
 +      synchronized (this) {
 +        runnableTids.removeIf(txid -> {
 +          var deferredTime = deferred.get(txid);
 +          if (deferredTime != null) {
-             if (deferredTime >= System.currentTimeMillis()) {
++            if ((deferredTime - System.nanoTime()) > 0) {
 +              return true;
 +            } else {
 +              deferred.remove(txid);
 +            }
 +          }
 +
 +          return reserved.contains(txid);
 +        });
 +      }
 +
 +      if (runnableTids.isEmpty()) {
 +        if (beforeCount == unreservedRunnableCount.getCount()) {
 +          long waitTime = 5000;
 +          if (!deferred.isEmpty()) {
-             Long minTime = Collections.min(deferred.values());
-             waitTime = minTime - System.currentTimeMillis();
++            long currTime = System.nanoTime();
++            long minWait =
++                deferred.values().stream().mapToLong(l -> l - 
currTime).min().getAsLong();
++            waitTime = TimeUnit.MILLISECONDS.convert(minWait, 
TimeUnit.NANOSECONDS);
 +          }
 +
 +          if (waitTime > 0) {
 +            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
 +                keepWaiting::get);
 +          }
 +        }
 +      } else {
 +        return runnableTids.iterator();
 +      }
 +
 +    }
 +
 +    return Collections.emptyIterator();
 +  }
 +
 +  @Override
 +  public List<Long> list() {
 +    ArrayList<Long> l = new ArrayList<>();
 +    List<String> transactions = getTransactions();
 +    for (String txid : transactions) {
 +      l.add(parseTid(txid));
 +    }
 +    return l;
 +  }
 +
 +  @Override
 +  public ReadOnlyFateTxStore<T> read(long tid) {
 +    return newFateTxStore(tid, false);
 +  }
 +
 +  protected boolean isRunnable(TStatus status) {
 +    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
 +        || status == TStatus.SUBMITTED;
 +  }
 +
 +  protected long parseTid(String txdir) {
 +    return Long.parseLong(txdir.split("_")[1], 16);
 +  }
 +
 +  protected abstract List<String> getTransactions();
 +
 +  protected abstract TStatus _getStatus(long tid);
 +
 +  protected abstract FateTxStore<T> newFateTxStore(long tid, boolean 
isReserved);
 +
 +  protected abstract class AbstractFateTxStoreImpl<T> implements 
FateTxStore<T> {
 +    protected final long tid;
 +    protected final boolean isReserved;
 +
 +    protected TStatus observedStatus = null;
 +
 +    protected AbstractFateTxStoreImpl(long tid, boolean isReserved) {
 +      this.tid = tid;
 +      this.isReserved = isReserved;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      Preconditions.checkState(!isReserved,
 +          "Attempted to wait for status change while reserved " + 
FateTxId.formatTid(getID()));
 +      while (true) {
 +
 +        long countBefore = unreservedNonNewCount.getCount();
 +
 +        TStatus status = _getStatus(tid);
 +        if (expected.contains(status)) {
 +          return status;
 +        }
 +
 +        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
 +      }
 +    }
 +
 +    @Override
-     public void unreserve(long deferTime) {
++    public void unreserve(long deferTime, TimeUnit timeUnit) {
++      deferTime = TimeUnit.NANOSECONDS.convert(deferTime, timeUnit);
 +
 +      if (deferTime < 0) {
 +        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
 +      }
 +
 +      synchronized (AbstractFateStore.this) {
 +        if (!reserved.remove(tid)) {
 +          throw new IllegalStateException(
 +              "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
 +        }
 +
 +        // notify any threads waiting to reserve
 +        AbstractFateStore.this.notifyAll();
 +
 +        if (deferTime > 0) {
-           deferred.put(tid, System.currentTimeMillis() + deferTime);
++          deferred.put(tid, System.nanoTime() + deferTime);
 +        }
 +      }
 +
 +      if (observedStatus != null && isRunnable(observedStatus)) {
 +        unreservedRunnableCount.increment();
 +      }
 +
 +      if (observedStatus != TStatus.NEW) {
 +        unreservedNonNewCount.increment();
 +      }
 +    }
 +
 +    protected void verifyReserved(boolean isWrite) {
 +      if (!isReserved && isWrite) {
 +        throw new IllegalStateException("Attempted write on unreserved FATE 
transaction.");
 +      }
 +
 +      if (isReserved) {
 +        synchronized (AbstractFateStore.this) {
 +          if (!reserved.contains(tid)) {
 +            throw new IllegalStateException(
 +                "Tried to operate on unreserved transaction " + 
FateTxId.formatTid(tid));
 +          }
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TStatus getStatus() {
 +      verifyReserved(false);
 +      var status = _getStatus(tid);
 +      observedStatus = status;
 +      return status;
 +    }
 +
 +    @Override
 +    public long getID() {
 +      return tid;
 +    }
 +
 +    protected byte[] serializeTxInfo(Serializable so) {
 +      if (so instanceof String) {
 +        return ("S " + so).getBytes(UTF_8);
 +      } else {
 +        byte[] sera = serialize(so);
 +        byte[] data = new byte[sera.length + 2];
 +        System.arraycopy(sera, 0, data, 2, sera.length);
 +        data[0] = 'O';
 +        data[1] = ' ';
 +        return data;
 +      }
 +    }
 +
 +    protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
 +      if (data[0] == 'O') {
 +        byte[] sera = new byte[data.length - 2];
 +        System.arraycopy(data, 2, sera, 0, sera.length);
 +        return (Serializable) deserialize(sera);
 +      } else if (data[0] == 'S') {
 +        return new String(data, 2, data.length - 2, UTF_8);
 +      } else {
 +        throw new IllegalStateException("Bad node data " + txInfo);
 +      }
 +    }
 +
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 95ef99448f,858e6e6998..6a64014f01
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@@ -32,10 -32,9 +32,11 @@@ import java.util.List
  import java.util.Map;
  import java.util.Map.Entry;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
  
 -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
  import org.apache.accumulo.core.fate.zookeeper.FateLock;
  import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
  import org.apache.accumulo.core.fate.zookeeper.ZooReader;
@@@ -431,28 -432,26 +432,28 @@@ public class AdminUtil<T> 
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -      case SUCCESSFUL:
 -        System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
 -        zs.delete(txid);
 -        state = true;
 -        break;
 +    FateTxStore<T> txStore = zs.reserve(txid);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.printf("Invalid transaction ID: %016x%n", txid);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +        case SUCCESSFUL:
 +          System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
 +          txStore.delete();
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
 -
 -    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
      return state;
    }
  
@@@ -470,36 -469,33 +471,36 @@@
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -        System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
 -        zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
 -        state = true;
 -        break;
 -
 -      case SUCCESSFUL:
 -        System.out.printf("Transaction already completed: %016x (%s)%n", 
txid, ts);
 -        break;
 -
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -        System.out.printf("Transaction already failed: %016x (%s)%n", txid, 
ts);
 -        state = true;
 -        break;
 +    FateTxStore<T> txStore = zs.reserve(txid);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.printf("Invalid transaction ID: %016x%n", txid);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +          System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
 +          txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
 +          state = true;
 +          break;
 +
 +        case SUCCESSFUL:
 +          System.out.printf("Transaction already completed: %016x (%s)%n", 
txid, ts);
 +          break;
 +
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +          System.out.printf("Transaction already failed: %016x (%s)%n", txid, 
ts);
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
  
 -    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
      return state;
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index f61c06028c,ca016d0c9c..080ff0d33d
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@@ -24,8 -25,7 +24,9 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
 +import java.util.Optional;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -108,7 -108,7 +109,7 @@@ public class AgeOffStore<T> implements 
            }
  
          } finally {
-           txStore.unreserve(0);
 -          store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
++          txStore.unreserve(0, TimeUnit.MILLISECONDS);
          }
        } catch (Exception e) {
          log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
@@@ -138,7 -138,7 +139,7 @@@
              break;
          }
        } finally {
-         txStore.unreserve(0);
 -        store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
++        txStore.unreserve(0, TimeUnit.MILLISECONDS);
        }
      }
    }
diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index a54ad734ee,1a14418b1a..4b54ccc771
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -38,7 -35,7 +38,8 @@@ import java.util.concurrent.LinkedTrans
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TransferQueue;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.function.Function;
  
@@@ -203,8 -132,8 +204,8 @@@ public class Fate<T> 
          } catch (Exception e) {
            runnerLog.error("Uncaught exception in FATE runner thread.", e);
          } finally {
 -          if (tid != null) {
 -            store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
 +          if (txStore != null) {
-             txStore.unreserve(deferTime);
++            txStore.unreserve(deferTime, TimeUnit.MILLISECONDS);
            }
          }
        }
@@@ -356,15 -281,15 +357,15 @@@
          }
  
          if (autoCleanUp) {
 -          store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp);
 +          txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
          }
  
 -        store.setTransactionInfo(tid, TxInfo.TX_NAME, txName);
 +        txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
  
 -        store.setStatus(tid, SUBMITTED);
 +        txStore.setStatus(SUBMITTED);
        }
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
  
    }
@@@ -402,7 -325,7 +403,7 @@@
              return false;
            }
          } finally {
-           txStore.unreserve(0);
 -          store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++          txStore.unreserve(0, TimeUnit.MILLISECONDS);
          }
        } else {
          // reserved, lets retry.
@@@ -433,7 -356,7 +434,7 @@@
            break;
        }
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
@@@ -444,9 -367,9 +445,9 @@@
          throw new IllegalStateException("Tried to get exception when 
transaction "
              + FateTxId.formatTid(tid) + " not in successful state");
        }
 -      return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
 +      return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE);
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
@@@ -458,9 -381,9 +459,9 @@@
          throw new IllegalStateException("Tried to get exception when 
transaction "
              + FateTxId.formatTid(tid) + " not in failed state");
        }
 -      return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
 +      return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION);
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 7db5766e81,0000000000..81a0e3212f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@@ -1,110 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
 +import java.util.Optional;
++import java.util.concurrent.TimeUnit;
 +
 +/**
 + * Transaction Store: a place to save transactions
 + *
 + * A transaction consists of a number of operations. To use, first create a 
transaction id, and then
 + * seed the transaction with an initial operation. An executor service can 
then execute the
 + * transaction's operation, possibly pushing more operations onto the 
transaction as each step
 + * successfully completes. If a step fails, the stack can be unwound, undoing 
each operation.
 + */
 +public interface FateStore<T> extends ReadOnlyFateStore<T> {
 +
 +  /**
 +   * Create a new transaction id
 +   *
 +   * @return a transaction id
 +   */
 +  long create();
 +
 +  /**
 +   * An interface that allows read/write access to the data related to a 
single fate operation.
 +   */
 +  interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
 +    @Override
 +    Repo<T> top();
 +
 +    /**
 +     * Update the given transaction with the next operation
 +     *
 +     * @param repo the operation
 +     */
 +    void push(Repo<T> repo) throws StackOverflowException;
 +
 +    /**
 +     * Remove the last pushed operation from the given transaction.
 +     */
 +    void pop();
 +
 +    /**
 +     * Update the state of a given transaction
 +     *
 +     * @param status execution status
 +     */
 +    void setStatus(TStatus status);
 +
 +    /**
 +     * Set transaction-specific information.
 +     *
 +     * @param txInfo name of attribute of a transaction to set.
 +     * @param val transaction data to store
 +     */
 +    void setTransactionInfo(Fate.TxInfo txInfo, Serializable val);
 +
 +    /**
 +     * Remove the transaction from the store.
 +     *
 +     */
 +    void delete();
 +
 +    /**
 +     * Return the given transaction to the store.
 +     *
 +     * upon successful return the store now controls the referenced 
transaction id. caller should no
 +     * longer interact with it.
 +     *
 +     * @param deferTime time in millis to keep this transaction from being 
returned by
 +     *        {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. 
Must be non-negative.
 +     */
-     void unreserve(long deferTime);
++    void unreserve(long deferTime, TimeUnit timeUnit);
 +  }
 +
 +  /**
 +   * Attempt to reserve transaction
 +   *
 +   * @param tid transaction id
 +   * @return true if reserved by this call, false if already reserved
 +   */
 +  Optional<FateTxStore<T>> tryReserve(long tid);
 +
 +  /**
 +   * Reserve the specific tid.
 +   *
 +   * Reserving a transaction id ensures that nothing else in-process 
interacting via the same
 +   * instance will be operating on that transaction id.
 +   *
 +   */
 +  FateTxStore<T> reserve(long tid);
 +
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
index 238f981a22,0000000000..1d8c7126c2
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
 +import java.util.EnumSet;
 +import java.util.List;
++import java.util.concurrent.TimeUnit;
 +
 +public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> {
 +  protected final FateStore.FateTxStore<T> wrapped;
 +
 +  public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) {
 +    this.wrapped = wrapped;
 +  }
 +
 +  @Override
-   public void unreserve(long deferTime) {
-     wrapped.unreserve(deferTime);
++  public void unreserve(long deferTime, TimeUnit timeUnit) {
++    wrapped.unreserve(deferTime, timeUnit);
 +  }
 +
 +  @Override
 +  public Repo<T> top() {
 +    return wrapped.top();
 +  }
 +
 +  @Override
 +  public void push(Repo<T> repo) throws StackOverflowException {
 +    wrapped.push(repo);
 +  }
 +
 +  @Override
 +  public void pop() {
 +    wrapped.pop();
 +  }
 +
 +  @Override
 +  public FateStore.TStatus getStatus() {
 +    return wrapped.getStatus();
 +  }
 +
 +  @Override
 +  public void setStatus(FateStore.TStatus status) {
 +    wrapped.setStatus(status);
 +  }
 +
 +  @Override
 +  public FateStore.TStatus waitForStatusChange(EnumSet<FateStore.TStatus> 
expected) {
 +    return wrapped.waitForStatusChange(expected);
 +  }
 +
 +  @Override
 +  public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +    wrapped.setTransactionInfo(txInfo, val);
 +  }
 +
 +  @Override
 +  public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +    return wrapped.getTransactionInfo(txInfo);
 +  }
 +
 +  @Override
 +  public void delete() {
 +    wrapped.delete();
 +  }
 +
 +  @Override
 +  public long timeCreated() {
 +    return wrapped.timeCreated();
 +  }
 +
 +  @Override
 +  public long getID() {
 +    return wrapped.getID();
 +  }
 +
 +  @Override
 +  public List<ReadOnlyRepo<T>> getStack() {
 +    return wrapped.getStack();
 +  }
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index d3c3deb57a,eba25df062..dcbb78dfcd
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@@ -370,94 -329,157 +386,179 @@@ public class DefaultCompactionPlanner i
  
      Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
  
 +    FakeFileGenerator fakeFileGenerator = new FakeFileGenerator();
 +
      long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
  
 -    Collection<CompactableFile> group;
 -    if (params.getRunningCompactions().isEmpty()) {
 -      group =
 -          findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact, maxSizeToCompact);
 +    // This set represents future files that will be produced by running 
compactions. If the optimal
 +    // set of files to compact is computed and contains one of these files, 
then its optimal to wait
 +    // for this compaction to finish.
 +    Set<CompactableFile> expectedFiles = new HashSet<>();
 +    params.getRunningCompactions().stream().filter(job -> job.getKind() == 
params.getKind())
 +        .map(job -> getExpected(job.getFiles(), fakeFileGenerator))
 +        .forEach(compactableFile -> 
Preconditions.checkState(expectedFiles.add(compactableFile)));
 +    Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy));
 +    filesCopy.addAll(expectedFiles);
  
 -      if (!group.isEmpty() && group.size() < params.getCandidates().size()
 -          && params.getCandidates().size() <= maxFilesToCompact
 -          && (params.getKind() == CompactionKind.USER
 -              || params.getKind() == CompactionKind.SELECTOR)) {
 -        // USER and SELECTOR compactions must eventually compact all files. 
When a subset of files
 -        // that meets the compaction ratio is selected, look ahead and see if 
the next compaction
 -        // would also meet the compaction ratio. If not then compact 
everything to avoid doing
 -        // more than logarithmic work across multiple comapctions.
 -
 -        filesCopy.removeAll(group);
 -        filesCopy.add(getExpected(group, 0));
 -
 -        if (findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact,
 -            maxSizeToCompact).isEmpty()) {
 -          // The next possible compaction does not meet the compaction ratio, 
so compact
 -          // everything.
 -          group = Set.copyOf(params.getCandidates());
 -        }
 +    List<Collection<CompactableFile>> compactionJobs = new ArrayList<>();
  
 +    while (true) {
 +      var filesToCompact =
 +          findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact, maxSizeToCompact);
 +      if (!Collections.disjoint(filesToCompact, expectedFiles)) {
 +        // the optimal set of files to compact includes the output of a 
running compaction, so lets
 +        // wait for that running compaction to finish.
 +        break;
        }
  
 -    } else if (params.getKind() == CompactionKind.SYSTEM) {
 -      // This code determines if once the files compacting finish would they 
be included in a
 -      // compaction with the files smaller than them? If so, then wait for 
the running compaction
 -      // to complete.
 +      if (filesToCompact.isEmpty()) {
 +        break;
 +      }
  
 -      // The set of files running compactions may produce
 -      var expectedFiles = getExpected(params.getRunningCompactions());
 +      filesCopy.removeAll(filesToCompact);
  
 -      if (!Collections.disjoint(filesCopy, expectedFiles)) {
 -        throw new AssertionError();
 -      }
 +      // A compaction job will be created for these files, so lets add an 
expected file for that
 +      // planned compaction job. Then if future iterations of this loop will 
include that file then
 +      // they will not compact.
 +      var expectedFile = getExpected(filesToCompact, fakeFileGenerator);
 +      Preconditions.checkState(expectedFiles.add(expectedFile));
 +      Preconditions.checkState(filesCopy.add(expectedFile));
  
 -      filesCopy.addAll(expectedFiles);
 +      compactionJobs.add(filesToCompact);
  
 -      group =
 -          findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact, maxSizeToCompact);
 +      if (filesToCompact.size() < maxFilesToCompact) {
 +        // Only continue looking for more compaction jobs when a set of files 
is found equals
 +        // maxFilesToCompact in size. When the files found is less than the 
max size its an
 +        // indication that the compaction ratio was no longer met and 
therefore it would be
 +        // suboptimal to look for more jobs because the smallest optimal set 
was found.
 +        break;
 +      }
 +    }
  
 -      if (!Collections.disjoint(group, expectedFiles)) {
 -        // file produced by running compaction will eventually compact with 
existing files, so
 -        // wait.
 -        group = Set.of();
 +    if (compactionJobs.size() == 1
 +        && (params.getKind() == CompactionKind.USER || params.getKind() == 
CompactionKind.SELECTOR)
 +        && compactionJobs.get(0).size() < params.getCandidates().size()
 +        && compactionJobs.get(0).size() <= maxFilesToCompact) {
 +      // USER and SELECTOR compactions must eventually compact all files. 
When a subset of files
 +      // that meets the compaction ratio is selected, look ahead and see if 
the next compaction
 +      // would also meet the compaction ratio. If not then compact everything 
to avoid doing
 +      // more than logarithmic work across multiple comapctions.
 +
 +      var group = compactionJobs.get(0);
 +      var candidatesCopy = new HashSet<>(params.getCandidates());
 +
 +      candidatesCopy.removeAll(group);
 +      Preconditions.checkState(candidatesCopy.add(getExpected(group, 
fakeFileGenerator)));
 +
 +      if (findDataFilesToCompact(candidatesCopy, params.getRatio(), 
maxFilesToCompact,
 +          maxSizeToCompact).isEmpty()) {
 +        // The next possible compaction does not meet the compaction ratio, 
so compact
 +        // everything.
 +        compactionJobs.set(0, Set.copyOf(params.getCandidates()));
        }
 -    } else {
 -      group = Set.of();
      }
  
-     if (compactionJobs.isEmpty()
-         && (params.getKind() == CompactionKind.USER || params.getKind() == 
CompactionKind.SELECTOR)
-         && params.getRunningCompactions().stream()
-             .noneMatch(job -> job.getKind() == params.getKind())) {
-       // These kinds of compaction require files to compact even if none of 
the files meet the
-       // compaction ratio. No files were found using the compaction ratio and 
no compactions are
-       // running, so force a compaction.
-       compactionJobs = 
findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
 -    if (group.isEmpty()) {
++    if (compactionJobs.isEmpty()) {
+       if ((params.getKind() == CompactionKind.USER || params.getKind() == 
CompactionKind.SELECTOR)
+           && params.getRunningCompactions().stream()
+               .noneMatch(job -> job.getKind() == params.getKind())) {
 -        group = findMaximalRequiredSetToCompact(params.getCandidates(), 
maxFilesToCompact);
++        // These kinds of compaction require files to compact even if none of 
the files meet the
++        // compaction ratio. No files were found using the compaction ratio 
and no compactions are
++        // running, so force a compaction.
++        compactionJobs = 
findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
+       } else if (params.getKind() == CompactionKind.SYSTEM
+           && params.getRunningCompactions().isEmpty()
+           && params.getAll().size() == params.getCandidates().size()) {
+         int maxTabletFiles =
+             
getMaxTabletFiles(params.getServiceEnvironment().getConfiguration(params.getTableId()));
+         if (params.getAll().size() > maxTabletFiles) {
+           // The tablet is above its max files, there are no compactions 
running, all files are
+           // candidates for a system compaction, and no files were found to 
compact. Attempt to
+           // find a set of files to compact by lowering the compaction ratio.
 -          group = findFilesToCompactWithLowerRatio(params, maxSizeToCompact, 
maxTabletFiles);
++          compactionJobs =
++              findFilesToCompactWithLowerRatio(params, maxSizeToCompact, 
maxTabletFiles);
+         }
+       }
      }
  
 -    if (group.isEmpty()) {
 -      return params.createPlanBuilder().build();
 -    } else {
 -      // determine which executor to use based on the size of the files
 -      var ceid = getExecutor(group);
 -
 -      return params.createPlanBuilder().addJob(createPriority(params, group), 
ceid, group).build();
 -    }
 +    var builder = params.createPlanBuilder();
 +    compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, 
jobFiles),
 +        getExecutor(jobFiles), jobFiles));
 +    return builder.build();
    }
  
+   static int getMaxTabletFiles(ServiceEnvironment.Configuration 
configuration) {
+     int maxTabletFiles = 
Integer.parseInt(configuration.get(Property.TABLE_FILE_MAX.getKey()));
+     if (maxTabletFiles <= 0) {
+       maxTabletFiles =
+           
Integer.parseInt(configuration.get(Property.TSERV_SCAN_MAX_OPENFILES.getKey())) 
- 1;
+     }
+     return maxTabletFiles;
+   }
+ 
+   /**
+    * Searches for the highest compaction ratio that is less than the 
configured ratio that will
+    * lower the number of files.
+    */
 -  private Collection<CompactableFile> 
findFilesToCompactWithLowerRatio(PlanningParameters params,
 -      long maxSizeToCompact, int maxTabletFiles) {
++  private List<Collection<CompactableFile>> findFilesToCompactWithLowerRatio(
++      PlanningParameters params, long maxSizeToCompact, int maxTabletFiles) {
+     double lowRatio = 1.0;
+     double highRatio = params.getRatio();
+ 
+     Preconditions.checkArgument(highRatio >= lowRatio);
+ 
+     var candidates = Set.copyOf(params.getCandidates());
+     Collection<CompactableFile> found = Set.of();
+ 
+     int goalCompactionSize = candidates.size() - maxTabletFiles + 1;
+     if (goalCompactionSize > maxFilesToCompact) {
+       // The tablet is way over max tablet files, so multiple compactions 
will be needed. Therefore,
+       // do not set a goal size for this compaction and find the largest 
compaction ratio that will
+       // compact some set of files.
+       goalCompactionSize = 0;
+     }
+ 
+     // Do a binary search of the compaction ratios.
+     while (highRatio - lowRatio > .1) {
+       double ratioToCheck = (highRatio - lowRatio) / 2 + lowRatio;
+ 
+       // This is continually resorting the list of files in the following 
call, could optimize this
+       var filesToCompact =
+           findDataFilesToCompact(candidates, ratioToCheck, maxFilesToCompact, 
maxSizeToCompact);
+ 
 -      log.trace("Tried ratio {} and found {} {} {}", ratioToCheck, 
filesToCompact,
 -          filesToCompact.size() >= goalCompactionSize, goalCompactionSize);
++      log.info("Tried ratio {} and found {} {} {} {}", ratioToCheck, 
filesToCompact,
++          filesToCompact.size() >= goalCompactionSize, goalCompactionSize, 
maxFilesToCompact);
+ 
+       if (filesToCompact.isEmpty() || filesToCompact.size() < 
goalCompactionSize) {
+         highRatio = ratioToCheck;
+       } else {
+         lowRatio = ratioToCheck;
+         found = filesToCompact;
+       }
+     }
+ 
+     if (found.isEmpty() && lowRatio == 1.0) {
+       // in this case the data must be really skewed, operator intervention 
may be needed.
+       log.warn(
+           "Attempted to lower compaction ration from {} to {} for {} because 
there are {} files "
+               + "and the max tablet files is {}, however no set of files to 
compact were found.",
+           params.getRatio(), highRatio, params.getTableId(), 
params.getCandidates().size(),
+           maxTabletFiles);
+     }
+ 
+     log.info(
+         "For {} found {} files to compact lowering compaction ratio from {} 
to {} because the tablet "
+             + "exceeded {} files, it had {}",
+         params.getTableId(), found.size(), params.getRatio(), lowRatio, 
maxTabletFiles,
+         params.getCandidates().size());
+ 
 -    return found;
++    if (found.isEmpty()) {
++      return List.of();
++    } else {
++      return List.of(found);
++    }
+   }
+ 
    private static short createPriority(PlanningParameters params,
        Collection<CompactableFile> group) {
      return CompactionJobPrioritizer.createPriority(params.getKind(), 
params.getAll().size(),
diff --cc core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
index d2530ce1f3,c2b086ee34..1e02bd2620
--- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
@@@ -22,9 -22,10 +22,10 @@@ import static org.junit.jupiter.api.Ass
  
  import java.util.HashSet;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
 -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
  import org.apache.zookeeper.KeeperException;
  import org.junit.jupiter.api.Test;
  
@@@ -50,25 -51,25 +51,25 @@@ public class AgeOffStoreTest 
      aoStore.ageOff();
  
      long txid1 = aoStore.create();
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    var txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      aoStore.ageOff();
  
      long txid2 = aoStore.create();
 -    aoStore.reserve(txid2);
 -    aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
 -    aoStore.setStatus(txid2, TStatus.FAILED);
 -    aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 +    var txStore2 = aoStore.reserve(txid2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0);
++    txStore2.unreserve(0, TimeUnit.MILLISECONDS);
  
      tts.time = 6;
  
      long txid3 = aoStore.create();
 -    aoStore.reserve(txid3);
 -    aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
 -    aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
 -    aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 +    var txStore3 = aoStore.reserve(txid3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0);
++    txStore3.unreserve(0, TimeUnit.MILLISECONDS);
  
      Long txid4 = aoStore.create();
  
@@@ -99,21 -100,21 +100,21 @@@
      TestTimeSource tts = new TestTimeSource();
      TestStore testStore = new TestStore();
      long txid1 = testStore.create();
 -    testStore.reserve(txid1);
 -    testStore.setStatus(txid1, TStatus.IN_PROGRESS);
 -    testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    var txStore1 = testStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      long txid2 = testStore.create();
 -    testStore.reserve(txid2);
 -    testStore.setStatus(txid2, TStatus.IN_PROGRESS);
 -    testStore.setStatus(txid2, TStatus.FAILED);
 -    testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 +    var txStore2 = testStore.reserve(txid2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0);
++    txStore2.unreserve(0, TimeUnit.MILLISECONDS);
  
      long txid3 = testStore.create();
 -    testStore.reserve(txid3);
 -    testStore.setStatus(txid3, TStatus.IN_PROGRESS);
 -    testStore.setStatus(txid3, TStatus.SUCCESSFUL);
 -    testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 +    var txStore3 = testStore.reserve(txid3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0);
++    txStore3.unreserve(0, TimeUnit.MILLISECONDS);
  
      Long txid4 = testStore.create();
  
@@@ -134,9 -135,9 +135,9 @@@
      assertEquals(Set.of(txid1), new HashSet<>(aoStore.list()));
      assertEquals(1, new HashSet<>(aoStore.list()).size());
  
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.FAILED_IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      tts.time = 30;
  
@@@ -145,9 -146,9 +146,9 @@@
      assertEquals(Set.of(txid1), new HashSet<>(aoStore.list()));
      assertEquals(1, new HashSet<>(aoStore.list()).size());
  
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.FAILED);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.FAILED);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      aoStore.ageOff();
  
diff --cc core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 058b0c50a4,3253c41a90..1dabfcf697
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@@ -18,17 -18,13 +18,18 @@@
   */
  package org.apache.accumulo.core.fate;
  
 +import java.io.Serializable;
  import java.util.ArrayList;
 +import java.util.EnumSet;
  import java.util.HashMap;
  import java.util.HashSet;
 +import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
 +import java.util.Optional;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  
  /**
   * Transient in memory store for transactions.
@@@ -66,97 -61,35 +67,97 @@@ public class TestStore implements FateS
      }
    }
  
 -  @Override
 -  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
 -    if (!reserved.remove(tid)) {
 -      throw new IllegalStateException();
 +  private class TestFateTxStore implements FateTxStore<String> {
 +
 +    private final long tid;
 +
 +    TestFateTxStore(long tid) {
 +      this.tid = tid;
      }
 -  }
  
 -  @Override
 -  public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public Repo<String> top() {
 +      throw new UnsupportedOperationException();
      }
  
 -    TStatus status = statuses.get(tid);
 -    if (status == null) {
 -      return TStatus.UNKNOWN;
 +    @Override
 +    public List<ReadOnlyRepo<String>> getStack() {
 +      throw new UnsupportedOperationException();
      }
 -    return status;
 -  }
  
 -  @Override
 -  public void setStatus(long tid, 
org.apache.accumulo.core.fate.TStore.TStatus status) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public TStatus getStatus() {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +
 +      TStatus status = statuses.get(tid);
 +      if (status == null) {
 +        return TStatus.UNKNOWN;
 +      }
 +      return status;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public long timeCreated() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public long getID() {
 +      return tid;
 +    }
 +
 +    @Override
 +    public void push(Repo<String> repo) throws StackOverflowException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void pop() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void setStatus(TStatus status) {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      if (!statuses.containsKey(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.put(tid, status);
 +    }
 +
 +    @Override
 +    public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +      throw new UnsupportedOperationException();
      }
 -    if (!statuses.containsKey(tid)) {
 -      throw new IllegalStateException();
 +
 +    @Override
 +    public void delete() {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.remove(tid);
 +    }
 +
 +    @Override
-     public void unreserve(long deferTime) {
++    public void unreserve(long deferTime, TimeUnit timeUnit) {
 +      if (!reserved.remove(tid)) {
 +        throw new IllegalStateException();
 +      }
      }
 -    statuses.put(tid, status);
    }
  
    @Override
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 6447a2d147,9f4f9d315c..7886a5f0c6
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@@ -27,14 -26,15 +27,16 @@@ import static org.junit.jupiter.api.Ass
  
  import java.net.URI;
  import java.net.URISyntaxException;
+ import java.util.ArrayList;
  import java.util.Collection;
 +import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
+ import java.util.Optional;
  import java.util.Set;
 -import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  
  import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
@@@ -600,6 -446,144 +605,146 @@@ public class DefaultCompactionPlannerTe
      assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
    }
  
+   // Test cases where a tablet has more than table.file.max files, but no 
files were found using the
+   // compaction ratio. The planner should try to find the highest ratio that 
will result in a
+   // compaction.
+   @Test
+   public void testMaxTabletFiles() throws Exception {
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 'internal','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "7");
+     var conf = new 
ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     // For this case need to compact three files and the highest ratio that 
achieves that is 1.8
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1000, 1.1, 1.9, 1.8, 1.6, 1.3, 1.4, 1.3, 1.2, 1.1);
+     var params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+     var job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 1.9, 1.8), job.getFiles());
+ 
+     // For this case need to compact two files and the highest ratio that 
achieves that is 2.9
+     all = createCFs(1000, 2, 2.9, 2.8, 2.7, 2.6, 2.5, 2.4, 2.3);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 2, 2.9), job.getFiles());
+ 
+     all =
+         createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9, 2.8, 2, 2, 2, 2, 2, 
2, 2, 2, 2, 2, 2, 2, 2);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9), 
job.getFiles());
+ 
+     all = createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 
1.9), job.getFiles());
+ 
+     // In this case the tablet can not be brought below the max files limit 
in a single compaction,
+     // so it should find the highest ratio to compact
+     for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) {
+       all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 
1.1, 1.1, 1.1, 1.1,
+           1.1, 1.1);
+       params = createPlanningParams(all, all, Set.of(), ratio, 
CompactionKind.SYSTEM, conf);
+       plan = planner.makePlan(params);
+       job = getOnlyElement(plan.getJobs());
+       assertEquals(createCFs(1000, 1.9), job.getFiles());
+     }
+ 
+     // In this case the tablet can be brought below the max limit in single 
compaction, so it should
+     // find this
+     all =
+         createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 
1.1, 1.1, 1.1, 1.1, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 
1.1), job.getFiles());
+ 
+     // each file is 10x the size of the file smaller than it
+     all = createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1), 
job.getFiles());
+ 
+     // test with some files growing 20x, ensure those are not included
+     for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) {
+       all = createCFs(10, 1.05, 1.05, 1.25, 1.75, 1.25, 1.05, 1.05, 1.05);
+       params = createPlanningParams(all, all, Set.of(), ratio, 
CompactionKind.SYSTEM, conf);
+       plan = planner.makePlan(params);
+       job = getOnlyElement(plan.getJobs());
+       assertEquals(createCFs(10, 1.05, 1.05, 1.25, 1.75), job.getFiles());
+     }
+ 
+   }
+ 
+   @Test
+   public void testMaxTabletFilesNoCompaction() throws Exception {
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "7");
+     var conf = new 
ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     // ensure that when a compaction would be over the max size limit that it 
is not planned
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1_000_000_000, 2, 2, 2, 2, 2, 2, 2);
+     var params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+ 
++    System.out.println(plan.getJobs());
++
+     assertTrue(plan.getJobs().isEmpty());
+ 
+     // ensure when a compaction is running and we are over files max but 
below the compaction ratio
+     // that a compaction is not planned
+     all = createCFs(1_000, 2, 2, 2, 2, 2, 2, 2);
+     var job = new CompactionJobImpl((short) 1, 
CompactionExecutorIdImpl.externalId("ee1"),
+         createCFs("F1", "1000"), CompactionKind.SYSTEM, Optional.of(false));
+     params = createPlanningParams(all, all, Set.of(job), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+ 
+     assertTrue(plan.getJobs().isEmpty());
+ 
+     // a really bad situation, each file is 20 times the size of its smaller 
file. The algorithm
+     // does not search that for ratios that low.
+     all = createCFs(10, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05);
+     params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     assertTrue(plan.getJobs().isEmpty());
+   }
+ 
+   // Test to ensure that plugin falls back from TABLE_FILE_MAX to 
TSERV_SCAN_MAX_OPENFILES
+   @Test
+   public void testMaxTableFilesFallback() throws Exception {
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 'internal','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "0");
+     overrides.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "5");
+     var conf = new 
ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.3, 1.2, 1.1);
+     var params = createPlanningParams(all, all, Set.of(), 3, 
CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+     var job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4), 
job.getFiles());
+   }
+ 
    private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
        Set<CompactableFile> files) {
      return new CompactionPlanImpl.BuilderImpl(kind, all, all)
@@@ -607,16 -591,46 +752,53 @@@
          .build().getJobs().iterator().next();
    }
  
+   // Create a set of files whose sizes would require certain compaction 
ratios to compact
 -  private Set<CompactableFile> createCFs(int initialSize, double... 
desiredRatios)
 -      throws URISyntaxException {
++  private Set<CompactableFile> createCFs(int initialSize, double... 
desiredRatios) {
+     List<String> pairs = new ArrayList<>();
+     pairs.add("F1");
+     pairs.add(initialSize + "");
+ 
+     double previousFileSizes = initialSize;
+ 
+     int i = 2;
+     for (double desiredRatio : desiredRatios) {
+       Preconditions.checkArgument(desiredRatio > 1.0);
+       Preconditions.checkArgument(desiredRatio <= i);
+ 
+       /*
+        * The compaction ratio formula is fileSize * ratio < fileSize + 
previousFileSizes. Solved the
+        * following equation to compute a file size given a desired ratio.
+        *
+        * fileSize * ratio = fileSize + previousFileSizes
+        *
+        * fileSize * ratio - fileSize = previousFileSizes
+        *
+        * fileSize * (ratio - 1) = previousFileSizes
+        *
+        * fileSize = previousFileSizes / (ratio - 1)
+        */
+ 
+       double fileSize = previousFileSizes / (desiredRatio - 1);
+       pairs.add("F" + i + "_" + desiredRatio);
+       pairs.add(Math.round(fileSize) + "");
+ 
+       previousFileSizes += fileSize;
+       i++;
+     }
+ 
+     return createCFs(pairs.toArray(new String[0]));
+   }
+ 
 -  private static Set<CompactableFile> createCFs(String... namesSizePairs)
 -      throws URISyntaxException {
 +  private static CompactableFile createCF(String name, long size) {
 +    try {
 +      return CompactableFile
 +          .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name 
+ ".rf"), size, 0);
 +    } catch (URISyntaxException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
      Set<CompactableFile> files = new HashSet<>();
  
      for (int i = 0; i < namesSizePairs.length; i += 2) {
@@@ -721,9 -750,13 +913,15 @@@
    private static CompactionPlanner.InitParameters getInitParams(Configuration 
conf,
        String executors) {
  
+     String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
      Map<String,String> options = new HashMap<>();
      options.put("executors", executors.replaceAll("'", "\""));
-     options.put("maxOpen", "15");
+ 
+     if (maxOpen != null) {
+       options.put("maxOpen", maxOpen);
++    } else {
++      options.put("maxOpen", "15");
+     }
  
      ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
      EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
index 7978d3c2e5,0000000000..fdcb93732f
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
@@@ -1,331 -1,0 +1,337 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.compaction;
 +
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 +import org.apache.accumulo.core.client.PluginEnvironment;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.PluginConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
 +import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 +import org.apache.accumulo.core.client.rfile.RFileSource;
 +import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.client.summary.Summary;
 +import org.apache.accumulo.core.clientImpl.UserCompactionUtils;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.TabletId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher;
 +import org.apache.accumulo.core.summary.Gatherer;
 +import org.apache.accumulo.core.summary.SummarizerFactory;
 +import org.apache.accumulo.core.summary.SummaryCollection;
 +import org.apache.accumulo.core.summary.SummaryReader;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Collections2;
 +
 +public class CompactionPluginUtils {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(CompactionPluginUtils.class);
 +
 +  private static <T> T newInstance(AccumuloConfiguration tableConfig, String 
className,
 +      Class<T> baseClass) {
 +    String context = ClassLoaderUtil.tableContext(tableConfig);
 +    try {
 +      return ConfigurationTypeHelper.getClassInstance(context, className, 
baseClass);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalArgumentException(e);
 +    }
 +  }
 +
 +  public static Set<StoredTabletFile> selectFiles(ServerContext context, 
KeyExtent extent,
 +      CompactionConfig compactionConfig, Map<StoredTabletFile,DataFileValue> 
allFiles) {
 +    if (!UserCompactionUtils.isDefault(compactionConfig.getSelector())) {
 +      return selectFiles(context, extent, allFiles, 
compactionConfig.getSelector());
 +    } else {
 +      return allFiles.keySet();
 +    }
 +  }
 +
 +  private static Set<StoredTabletFile> selectFiles(ServerContext context, 
KeyExtent extent,
 +      Map<StoredTabletFile,DataFileValue> datafiles, PluginConfig 
selectorConfig) {
 +
 +    log.debug("Selecting files for {} using {}", extent, selectorConfig);
 +
 +    CompactionSelector selector = 
newInstance(context.getTableConfiguration(extent.tableId()),
 +        selectorConfig.getClassName(), CompactionSelector.class);
 +
 +    final ServiceEnvironment senv = new ServiceEnvironmentImpl(context);
 +
 +    selector.init(new CompactionSelector.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return selectorConfig.getOptions();
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +    });
 +
 +    CompactionSelector.Selection selection =
 +        selector.select(new CompactionSelector.SelectionParameters() {
 +          @Override
 +          public PluginEnvironment getEnvironment() {
 +            return senv;
 +          }
 +
 +          @Override
 +          public Collection<CompactableFile> getAvailableFiles() {
 +            return Collections2.transform(datafiles.entrySet(),
 +                e -> new CompactableFileImpl(e.getKey(), e.getValue()));
 +          }
 +
 +          @Override
 +          public Collection<Summary> getSummaries(Collection<CompactableFile> 
files,
 +              Predicate<SummarizerConfiguration> summarySelector) {
 +
 +            // ELASTICITY_TODO this may open files for user tables in the 
manager, need to avoid
 +            // this. See #3526
 +
 +            try {
 +              var tableConf = context.getTableConfiguration(extent.tableId());
 +
 +              SummaryCollection sc = new SummaryCollection();
 +              SummarizerFactory factory = new SummarizerFactory(tableConf);
 +              for (CompactableFile cf : files) {
 +                var file = CompactableFileImpl.toStoredTabletFile(cf);
 +                FileSystem fs = 
context.getVolumeManager().getFileSystemByPath(file.getPath());
 +                Configuration conf = context.getHadoopConf();
 +                RFileSource source = new RFileSource(new 
FSDataInputStream(fs.open(file.getPath())),
 +                    fs.getFileStatus(file.getPath()).getLen(), 
file.getRange());
 +
 +                SummaryCollection fsc = SummaryReader
 +                    .load(conf, source, file.getFileName(), summarySelector, 
factory,
 +                        tableConf.getCryptoService())
 +                    .getSummaries(Collections.singletonList(new 
Gatherer.RowRange(extent)));
 +
 +                sc.merge(fsc, factory);
 +              }
 +              return sc.getSummaries();
 +            } catch (IOException ioe) {
 +              throw new UncheckedIOException(ioe);
 +            }
 +          }
 +
 +          @Override
 +          public TableId getTableId() {
 +            return extent.tableId();
 +          }
 +
 +          @Override
 +          public TabletId getTabletId() {
 +            return new TabletIdImpl(extent);
 +          }
 +
 +          @Override
 +          public Optional<SortedKeyValueIterator<Key,Value>> 
getSample(CompactableFile cf,
 +              SamplerConfiguration sc) {
 +
 +            // ELASTICITY_TODO this may open files for user tables in the 
manager, need to avoid
 +            // this. See #3526
 +
 +            try {
 +              var file = CompactableFileImpl.toStoredTabletFile(cf);
 +              FileSystem fs = 
context.getVolumeManager().getFileSystemByPath(file.getPath());
 +              Configuration conf = context.getHadoopConf();
 +              var tableConf = context.getTableConfiguration(extent.tableId());
 +              var iter = FileOperations.getInstance().newReaderBuilder()
 +                  .forFile(file, fs, conf, tableConf.getCryptoService())
 +                  
.withTableConfiguration(tableConf).seekToBeginning().build();
 +              var sampleIter = iter.getSample(new 
SamplerConfigurationImpl(sc));
 +              if (sampleIter == null) {
 +                iter.close();
 +                return Optional.empty();
 +              }
 +
 +              return Optional.of(sampleIter);
 +            } catch (IOException ioe) {
 +              throw new UncheckedIOException(ioe);
 +            }
 +          }
 +        });
 +
 +    return 
selection.getFilesToCompact().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +  }
 +
 +  public static Map<String,String> 
computeOverrides(Optional<CompactionConfig> compactionConfig,
-       ServerContext context, KeyExtent extent, Set<CompactableFile> files) {
++      ServerContext context, KeyExtent extent, Set<CompactableFile> 
inputFiles,
++      Set<CompactableFile> selectedFiles) {
 +
 +    if (compactionConfig.isPresent()
 +        && 
!UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer())) 
{
-       return CompactionPluginUtils.computeOverrides(context, extent, files,
++      return CompactionPluginUtils.computeOverrides(context, extent, 
inputFiles, selectedFiles,
 +          compactionConfig.orElseThrow().getConfigurer());
 +    }
 +
 +    var tableConf = context.getTableConfiguration(extent.tableId());
 +
 +    var configurorClass = tableConf.get(Property.TABLE_COMPACTION_CONFIGURER);
 +    if (configurorClass == null || configurorClass.isBlank()) {
 +      return Map.of();
 +    }
 +
 +    var opts =
 +        
tableConf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
 +
-     return CompactionPluginUtils.computeOverrides(context, extent, files,
++    return CompactionPluginUtils.computeOverrides(context, extent, 
inputFiles, selectedFiles,
 +        new PluginConfig(configurorClass, opts));
 +  }
 +
 +  public static Map<String,String> computeOverrides(ServerContext context, 
KeyExtent extent,
-       Set<CompactableFile> files, PluginConfig cfg) {
++      Set<CompactableFile> inputFiles, Set<CompactableFile> selectedFiles, 
PluginConfig cfg) {
 +
 +    CompactionConfigurer configurer = 
newInstance(context.getTableConfiguration(extent.tableId()),
 +        cfg.getClassName(), CompactionConfigurer.class);
 +
 +    final ServiceEnvironment senv = new ServiceEnvironmentImpl(context);
 +
 +    configurer.init(new CompactionConfigurer.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return cfg.getOptions();
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +    });
 +
 +    var overrides = configurer.override(new 
CompactionConfigurer.InputParameters() {
 +      @Override
 +      public Collection<CompactableFile> getInputFiles() {
-         return files;
++        return inputFiles;
++      }
++
++      @Override
++      public Set<CompactableFile> getSelectedFiles() {
++        return selectedFiles;
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +
 +      @Override
 +      public TabletId getTabletId() {
 +        return new TabletIdImpl(extent);
 +      }
 +    });
 +
 +    if (overrides.getOverrides().isEmpty()) {
 +      return null;
 +    }
 +
 +    return overrides.getOverrides();
 +  }
 +
 +  static CompactionDispatcher createDispatcher(ServiceEnvironment env, 
TableId tableId) {
 +
 +    var conf = env.getConfiguration(tableId);
 +
 +    var className = conf.get(Property.TABLE_COMPACTION_DISPATCHER.getKey());
 +
 +    Map<String,String> opts = new HashMap<>();
 +
 +    
conf.getWithPrefix(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey()).forEach((k,
 v) -> {
 +      
opts.put(k.substring(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey().length()),
 v);
 +    });
 +
 +    var finalOpts = Collections.unmodifiableMap(opts);
 +
 +    CompactionDispatcher.InitParameters initParameters = new 
CompactionDispatcher.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return finalOpts;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return tableId;
 +      }
 +
 +      @Override
 +      public ServiceEnvironment getServiceEnv() {
 +        return env;
 +      }
 +    };
 +
 +    CompactionDispatcher dispatcher = null;
 +    try {
 +      dispatcher = env.instantiate(tableId, className, 
CompactionDispatcher.class);
 +    } catch (ReflectiveOperationException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    dispatcher.init(initParameters);
 +
 +    return dispatcher;
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 6d2a51c46f,0000000000..c03cb3241f
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1388 -1,0 +1,1399 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
++import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.metadata.AbstractTabletFile;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.EventCoordinator;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +import com.google.common.util.concurrent.MoreExecutors;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
 +  private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational 
cache of what external
 +   * compactions may be running. Its possible it may contain external 
compactions that are not
 +   * actually running. It may not contain compactions that are actually 
running. The metadata table
 +   * is the most authoritative source of what external compactions are 
currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected static final Map<ExternalCompactionId,RunningCompaction> 
RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  /*
 +   * When the manager starts up any refreshes that were in progress when the 
last manager process
 +   * died must be completed before new refresh entries are written. This map 
of countdown latches
 +   * helps achieve that goal.
 +   */
 +  private final Map<Ample.DataLevel,CountDownLatch> refreshLatches;
 +
 +  /* Map of group name to last time compactor called to get a compaction job 
*/
 +  // ELASTICITY_TODO need to clean out groups that are no longer configured..
 +  private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final LiveTServerSet tserverSet;
 +  private final SecurityOperation security;
 +  private final CompactionJobQueues jobQueues;
 +  private final EventCoordinator eventCoordinator;
 +  // Exposed for tests
 +  protected volatile Boolean shutdown = false;
 +
 +  private final ScheduledThreadPoolExecutor schedExecutor;
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private LoadingCache<Long,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> checked_tablet_dir_cache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private final QueueMetrics queueMetrics;
 +
 +  public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
 +      SecurityOperation security, EventCoordinator eventCoordinator) {
 +    this.ctx = ctx;
 +    this.tserverSet = tservers;
 +    this.schedExecutor = this.ctx.getScheduledExecutor();
 +    this.security = security;
 +    this.eventCoordinator = eventCoordinator;
 +
 +    this.jobQueues = new CompactionJobQueues(
 +        
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    var refreshLatches = new 
EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class);
 +    refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1));
 +    refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1));
 +    refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1));
 +    this.refreshLatches = Collections.unmodifiableMap(refreshLatches);
 +
 +    completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<Long,CompactionConfig> loader =
 +        txid -> CompactionConfigStorage.getConfig(ctx, txid);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config 
never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a 
time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. 
Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    checked_tablet_dir_cache =
 +        ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +            .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, 
schedExecutor);
 +    // At this point the manager does not have its lock so no actions should 
be taken yet
 +  }
 +
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createThread("CompactionCoordinator Thread", 
this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown = true;
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  private void processRefreshes(Ample.DataLevel dataLevel) {
 +    try (var refreshStream = ctx.getAmple().refreshes(dataLevel).stream()) {
 +      // process batches of refresh entries to avoid reading all into memory 
at once
 +      Iterators.partition(refreshStream.iterator(), 
10000).forEachRemaining(refreshEntries -> {
 +        LOG.info("Processing {} tablet refreshes for {}", 
refreshEntries.size(), dataLevel);
 +
 +        var extents =
 +            
refreshEntries.stream().map(RefreshEntry::getExtent).collect(Collectors.toList());
 +        var tabletsMeta = new HashMap<KeyExtent,TabletMetadata>();
 +        try (var tablets = ctx.getAmple().readTablets().forTablets(extents, 
Optional.empty())
 +            .fetch(PREV_ROW, LOCATION, SCANS).build()) {
 +          tablets.stream().forEach(tm -> tabletsMeta.put(tm.getExtent(), tm));
 +        }
 +
 +        var tserverRefreshes = new 
HashMap<TabletMetadata.Location,List<TKeyExtent>>();
 +
 +        refreshEntries.forEach(refreshEntry -> {
 +          var tm = tabletsMeta.get(refreshEntry.getExtent());
 +
 +          // only need to refresh if the tablet is still on the same tserver 
instance
 +          if (tm != null && tm.getLocation() != null
 +              && 
tm.getLocation().getServerInstance().equals(refreshEntry.getTserver())) {
 +            KeyExtent extent = tm.getExtent();
 +            Collection<StoredTabletFile> scanfiles = tm.getScans();
 +            var ttr = extent.toThrift();
 +            tserverRefreshes.computeIfAbsent(tm.getLocation(), k -> new 
ArrayList<>()).add(ttr);
 +          }
 +        });
 +
 +        String logId = "Coordinator:" + dataLevel;
 +        ThreadPoolExecutor threadPool =
 +            ctx.threadPools().createFixedThreadPool(10, "Tablet refresh " + 
logId, false);
 +        try {
 +          TabletRefresher.refreshTablets(threadPool, logId, ctx, 
tserverSet::getCurrentServers,
 +              tserverRefreshes);
 +        } finally {
 +          threadPool.shutdownNow();
 +        }
 +
 +        ctx.getAmple().refreshes(dataLevel).delete(refreshEntries);
 +      });
 +    }
 +    // allow new refreshes to be written now that all preexisting ones are 
processed
 +    refreshLatches.get(dataLevel).countDown();
 +  }
 +
 +  @Override
 +  public void run() {
 +
 +    processRefreshes(Ample.DataLevel.ROOT);
 +    processRefreshes(Ample.DataLevel.METADATA);
 +    processRefreshes(Ample.DataLevel.USER);
 +
 +    startCompactionCleaner(schedExecutor);
 +    startRunningCleaner(schedExecutor);
 +
 +    // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then 
resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING 
collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of 
running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage("Coordinator restarted, compaction found in 
progress");
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +
 +    // ELASTICITY_TODO the main function of the following loop was getting 
group summaries from
 +    // tservers. Its no longer doing that. May be best to remove the loop and 
make the remaining
 +    // task a scheduled one.
 +
 +    LOG.info("Starting loop to check tservers for compaction summaries");
 +    while (!shutdown) {
 +      long start = System.currentTimeMillis();
 +
 +      long now = System.currentTimeMillis();
 +      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
 +        if ((now - v) > getMissingCompactorWarningTime()) {
 +          // ELASTICITY_TODO may want to consider of the group has any jobs 
queued OR if the group
 +          // still exist in configuration
 +          LOG.warn("No compactors have checked in with coordinator for group 
{} in {}ms", k,
 +              getMissingCompactorWarningTime());
 +        }
 +      });
 +
 +      long checkInterval = getTServerCheckInterval();
 +      long duration = (System.currentTimeMillis() - start);
 +      if (checkInterval - duration > 0) {
 +        LOG.debug("Waiting {}ms for next group check", (checkInterval - 
duration));
 +        UtilWaitThread.sleep(checkInterval - duration);
 +      }
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
 +    return FIFTEEN_MINUTES;
 +  }
 +
 +  protected long getTServerCheckInterval() {
 +    return this.ctx.getConfiguration()
 +        
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
 +  public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call 
this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final String group = groupName.intern();
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", group, 
compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    CompactionJobQueues.MetaJob metaJob =
 +        jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName));
 +
 +    while (metaJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = 
getCompactionConfig(metaJob);
 +
 +      // this method may reread the metadata, do not use the metadata in 
metaJob for anything after
 +      // this method
 +      ExternalCompactionMetadata ecm = null;
 +
 +      var kind = metaJob.getJob().getKind();
 +
 +      // Only reserve user compactions when the config is present. When 
compactions are canceled the
 +      // config is deleted.
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(metaJob, compactorAddress,
 +            ExternalCompactionId.from(externalCompactionId));
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, metaJob, 
compactionConfig);
 +        // It is possible that by the time this added that the the compactor 
that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, group));
 +        LOG.debug("Returning external job {} to {} with {} files", 
result.externalCompactionId,
 +            compactorAddress, ecm.getJobFiles().size());
 +        break;
 +      } else {
 +        LOG.debug("Unable to reserve compaction job for {}, pulling another 
off the queue ",
 +            metaJob.getTabletMetadata().getExtent());
 +        metaJob = 
jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName));
 +      }
 +    }
 +
 +    if (metaJob == null) {
 +      LOG.debug("No jobs found in group {} ", group);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", group,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
 +    return result;
 +
 +  }
 +
 +  // ELASTICITY_TODO unit test this code
 +  private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob 
job,
 +      Set<StoredTabletFile> jobFiles) {
 +
 +    if (tablet == null) {
 +      // the tablet no longer exist
 +      return false;
 +    }
 +
 +    if (tablet.getOperationId() != null) {
 +      return false;
 +    }
 +
 +    if (!tablet.getFiles().containsAll(jobFiles)) {
 +      return false;
 +    }
 +
 +    var currentlyCompactingFiles = 
tablet.getExternalCompactions().values().stream()
 +        .flatMap(ecm -> 
ecm.getJobFiles().stream()).collect(Collectors.toSet());
 +
 +    if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) {
 +      return false;
 +    }
 +
 +    switch (job.getKind()) {
 +      case SYSTEM:
 +        if (tablet.getSelectedFiles() != null
 +            && !Collections.disjoint(jobFiles, 
tablet.getSelectedFiles().getFiles())) {
 +          return false;
 +        }
 +        break;
 +      case USER:
 +      case SELECTOR:
 +        if (tablet.getSelectedFiles() == null
 +            || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) {
 +          return false;
 +        }
 +        break;
 +      default:
 +        throw new UnsupportedOperationException("Not currently handling " + 
job.getKind());
 +    }
 +
 +    return true;
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (checked_tablet_dir_cache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        checked_tablet_dir_cache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  private ExternalCompactionMetadata 
createExternalCompactionMetadata(CompactionJob job,
 +      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
 +      ExternalCompactionId externalCompactionId) {
 +    boolean propDels;
 +
 +    Long fateTxId = null;
 +
 +    switch (job.getKind()) {
 +      case SYSTEM: {
 +        boolean compactingAll = tablet.getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +      }
 +        break;
 +      case SELECTOR:
 +      case USER: {
 +        boolean compactingAll = 
tablet.getSelectedFiles().initiallySelectedAll()
 +            && tablet.getSelectedFiles().getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +        fateTxId = tablet.getSelectedFiles().getFateTxId();
 +      }
 +        break;
 +      default:
 +        throw new IllegalArgumentException();
 +    }
 +
 +    Consumer<String> directoryCreator = dir -> 
checkTabletDir(tablet.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = 
TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        tablet, directoryCreator, externalCompactionId);
 +
 +    return new ExternalCompactionMetadata(jobFiles, newFile, 
compactorAddress, job.getKind(),
 +        job.getPriority(), job.getExecutor(), propDels, fateTxId);
 +
 +  }
 +
 +  private ExternalCompactionMetadata 
reserveCompaction(CompactionJobQueues.MetaJob metaJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
 +        || metaJob.getJob().getKind() == CompactionKind.USER);
 +
 +    var tabletMetadata = metaJob.getTabletMetadata();
 +
 +    var jobFiles = 
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +
 +    Retry retry =
 +        Retry.builder().maxRetries(5).retryAfter(100, 
MILLISECONDS).incrementBy(100, MILLISECONDS)
 +            .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, 
MINUTES).createRetry();
 +
 +    while (retry.canRetry()) {
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var extent = metaJob.getTabletMetadata().getExtent();
 +
 +        if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), 
jobFiles)) {
 +          return null;
 +        }
 +
 +        var ecm = createExternalCompactionMetadata(metaJob.getJob(), 
jobFiles, tabletMetadata,
 +            compactorAddress, externalCompactionId);
 +
 +        // any data that is read from the tablet to make a decision about if 
it can compact or not
 +        // must be included in the requireSame call
 +        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 +
 +        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +        tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
 +
 +        var result = tabletsMutator.process().get(extent);
 +
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          return ecm;
 +        } else {
 +          tabletMetadata = result.readMetadata();
 +        }
 +      }
 +
 +      retry.useRetry();
 +      try {
 +        retry.waitForNextAttempt(LOG,
 +            "Reserved compaction for " + 
metaJob.getTabletMetadata().getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return null;
 +  }
 +
 +  TExternalCompactionJob createThriftJob(String externalCompactionId,
 +      ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
++    Set<CompactableFile> selectedFiles;
++    if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
++      selectedFiles = Set.of();
++    } else {
++      selectedFiles = 
metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream()
++          .map(file -> new CompactableFileImpl(file,
++              metaJob.getTabletMetadata().getFilesMap().get(file)))
++          .collect(Collectors.toUnmodifiableSet());
++    }
++
 +    Map<String,String> overrides = 
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
-         metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles());
++        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), 
selectedFiles);
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
 +      var dfv = 
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
 +      return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), 
dfv.getNumEntries(),
 +          dfv.getTime());
 +    }).collect(Collectors.toList());
 +
 +    long fateTxid = 0;
 +    if (metaJob.getJob().getKind() == CompactionKind.USER) {
 +      fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId,
 +        metaJob.getTabletMetadata().getExtent().toThrift(), files, 
iteratorSettings,
 +        ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, 
CompactionJobQueues::getQueuedJobCount)
 +        .description("Number of queued major compactions").register(registry);
 +    Gauge.builder(METRICS_MAJC_RUNNING, this, 
CompactionCoordinator::getNumRunningCompactions)
 +        .description("Number of running major 
compactions").register(registry);
 +
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
 +    jobQueues.add(tabletMetadata, jobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  class RefreshWriter {
 +
 +    private final ExternalCompactionId ecid;
 +    private final KeyExtent extent;
 +
 +    private RefreshEntry writtenEntry;
 +
 +    RefreshWriter(ExternalCompactionId ecid, KeyExtent extent) {
 +      this.ecid = ecid;
 +      this.extent = extent;
 +
 +      var dataLevel = Ample.DataLevel.of(extent.tableId());
 +      try {
 +        // Wait for any refresh entries from the previous manager process to 
be processed before
 +        // writing new ones.
 +        refreshLatches.get(dataLevel).await();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    public void writeRefresh(TabletMetadata.Location location) {
 +      Objects.requireNonNull(location);
 +
 +      if (writtenEntry != null) {
 +        if (location.getServerInstance().equals(writtenEntry.getTserver())) {
 +          // the location was already written so nothing to do
 +          return;
 +        } else {
 +          deleteRefresh();
 +        }
 +      }
 +
 +      var entry = new RefreshEntry(ecid, extent, 
location.getServerInstance());
 +
 +      
ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())).add(List.of(entry));
 +
 +      LOG.debug("wrote refresh entry for {}", ecid);
 +
 +      writtenEntry = entry;
 +    }
 +
 +    public void deleteRefresh() {
 +      if (writtenEntry != null) {
 +        ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId()))
 +            .delete(List.of(writtenEntry));
 +        LOG.debug("deleted refresh entry for {}", ecid);
 +        writtenEntry = null;
 +      }
 +    }
 +  }
 +
 +  private Optional<CompactionConfig> 
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
 +    if (metaJob.getJob().getKind() == CompactionKind.USER
 +        && metaJob.getTabletMetadata().getSelectedFiles() != null) {
 +      var cconf =
 +          
compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateTxId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This 
method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can 
commit. Its possible that
 +   * things changed while the compaction was running and it can no longer 
commit.</li>
 +   * <li>If the compaction can commit then a ~refresh entry may be written to 
the metadata table.
 +   * This is done before attempting to commit to cover the case of process 
failure after commit. If
 +   * the manager dies after commit then when it restarts it will see the 
~refresh entry and refresh
 +   * that tablet. The ~refresh entry is only written when its a system 
compaction on a tablet with a
 +   * location.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets 
files or location
 +   * changed since reading the tablets metadata, then conditional mutation 
will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. 
When committing a
 +   * compaction the compacted files are removed and scan entries are added to 
the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between 
updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has 
a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if 
it has a location. This
 +   * will cause the tablet to start using the newly compacted files for 
future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using 
them.</li>
 +   * <li>If a ~refresh entry was written, delete it since the refresh was 
successful.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The 
user compaction fate
 +   * operation will see the compaction was committed after this code updates 
the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not 
be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as 
part of the fate
 +   * operation so that it's known to be done before the fate operation 
returns. Since the fate
 +   * operation will do it, there is no need to do it here for user 
compactions.
 +   * </p>
 +   *
 +   * <p>
 +   * The ~refresh entries serve a similar purpose to FATE operations, it 
ensures that code executes
 +   * even when a process dies. FATE was intentionally not used for compaction 
commit because FATE
 +   * stores its data in zookeeper. The refresh entry is stored in the 
metadata table, which is much
 +   * more scalable than zookeeper. The number of system compactions of small 
files could be large
 +   * and this would be a large number of writes to zookeeper. Zookeeper 
scales somewhat with reads,
 +   * but not with writes.
 +   * </p>
 +   *
 +   * <p>
 +   * Issue #3559 was opened to explore the possibility of making compaction 
commit a fate operation
 +   * which would remove the need for the ~refresh section.
 +   * </p>
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    var extent = KeyExtent.fromThrift(textent);
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 +
 +    if (!canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    ExternalCompactionMetadata ecm = 
tabletMeta.getExternalCompactions().get(ecid);
 +
 +    // ELASTICITY_TODO this code does not handle race conditions or faults. 
Need to ensure refresh
 +    // happens in the case of manager process death between commit and 
refresh.
 +    ReferencedTabletFile newDatafile =
 +        
TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName());
 +
 +    Optional<ReferencedTabletFile> optionalNewFile;
 +    try {
 +      optionalNewFile = renameOrDeleteFile(stats, ecm, newDatafile);
 +    } catch (IOException e) {
 +      LOG.warn("Can not commit complete compaction {} because unable to 
delete or rename {} ", ecid,
 +          ecm.getCompactTmpName(), e);
 +      compactionFailed(Map.of(ecid, extent));
 +      return;
 +    }
 +
 +    RefreshWriter refreshWriter = new RefreshWriter(ecid, extent);
 +
 +    try {
 +      tabletMeta = commitCompaction(stats, ecid, tabletMeta, optionalNewFile, 
refreshWriter);
 +    } catch (RuntimeException e) {
 +      LOG.warn("Failed to commit complete compaction {} {}", ecid, extent, e);
 +      compactionFailed(Map.of(ecid, extent));
 +    }
 +
 +    if (ecm.getKind() != CompactionKind.USER) {
 +      refreshTablet(tabletMeta);
 +    }
 +
 +    // if a refresh entry was written, it can be removed after the tablet was 
refreshed
 +    refreshWriter.deleteRefresh();
 +
 +    // It's possible that RUNNING might not have an entry for this ecid in 
the case
 +    // of a coordinator restart when the Coordinator can't find the TServer 
for the
 +    // corresponding external compaction.
 +    recordCompletion(ecid);
 +
 +    // This will causes the tablet to be reexamined to see if it needs any 
more compactions.
 +    eventCoordinator.event(extent, "Compaction completed %s", extent);
 +  }
 +
 +  private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats 
stats,
 +      ExternalCompactionMetadata ecm, ReferencedTabletFile newDatafile) 
throws IOException {
 +    if (stats.getEntriesWritten() == 0) {
 +      // the compaction produced no output so do not need to rename or add a 
file to the metadata
 +      // table, only delete the input files.
 +      if (!ctx.getVolumeManager().delete(ecm.getCompactTmpName().getPath())) {
 +        throw new IOException("delete returned false");
 +      }
 +
 +      return Optional.empty();
 +    } else {
 +      if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(),
 +          newDatafile.getPath())) {
 +        throw new IOException("rename returned false");
 +      }
 +
 +      return Optional.of(newDatafile);
 +    }
 +  }
 +
 +  private void refreshTablet(TabletMetadata metadata) {
 +    var location = metadata.getLocation();
 +    if (location != null) {
 +      KeyExtent extent = metadata.getExtent();
 +
 +      // there is a single tserver and single tablet, do not need a thread 
pool. The direct executor
 +      // will run everything in the current thread
 +      ExecutorService executorService = 
MoreExecutors.newDirectExecutorService();
 +      try {
 +        TabletRefresher.refreshTablets(executorService,
 +            "compaction:" + metadata.getExtent().toString(), ctx, 
tserverSet::getCurrentServers,
 +            Map.of(metadata.getLocation(), List.of(extent.toThrift())));
 +      } finally {
 +        executorService.shutdownNow();
 +      }
 +    }
 +  }
 +
 +  // ELASTICITY_TODO unit test this method
 +  private boolean canCommitCompaction(ExternalCompactionId ecid, 
TabletMetadata tabletMetadata) {
 +
 +    if (tabletMetadata == null) {
 +      LOG.debug("Received completion notification for nonexistent tablet {}", 
ecid);
 +      return false;
 +    }
 +
 +    var extent = tabletMetadata.getExtent();
 +
 +    if (tabletMetadata.getOperationId() != null) {
 +      // split, merge, and delete tablet should delete the compaction entry 
in the tablet
 +      LOG.debug("Received completion notification for tablet with active 
operation {} {} {}", ecid,
 +          extent, tabletMetadata.getOperationId());
 +      return false;
 +    }
 +
 +    ExternalCompactionMetadata ecm = 
tabletMetadata.getExternalCompactions().get(ecid);
 +
 +    if (ecm == null) {
 +      LOG.debug("Received completion notification for unknown compaction {} 
{}", ecid, extent);
 +      return false;
 +    }
 +
 +    if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
 +      if (tabletMetadata.getSelectedFiles() == null) {
 +        // when the compaction is canceled, selected files are deleted
 +        LOG.debug(
 +            "Received completion notification for user compaction and tablet 
has no selected files {} {}",
 +            ecid, extent);
 +        return false;
 +      }
 +
 +      if (ecm.getFateTxId() != 
tabletMetadata.getSelectedFiles().getFateTxId()) {
 +        // maybe the compaction was cancled and another user compaction was 
started on the tablet.
 +        LOG.debug(
 +            "Received completion notification for user compaction where its 
fate txid did not match the tablets {} {} {} {}",
 +            ecid, extent, FateTxId.formatTid(ecm.getFateTxId()),
 +            
FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId()));
 +      }
 +
 +      if 
(!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) {
 +        // this is not expected to happen
 +        LOG.error("User compaction contained files not in the selected set {} 
{} {} {} {}",
 +            tabletMetadata.getExtent(), ecid, ecm.getKind(),
 +            
Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles),
 +            ecm.getJobFiles());
 +        return false;
 +      }
 +    }
 +
 +    if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) {
 +      // this is not expected to happen
 +      LOG.error("Compaction contained files not in the tablet files set {} {} 
{} {}",
 +          tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), 
ecm.getJobFiles());
 +      return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  private TabletMetadata commitCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
 +      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile,
 +      RefreshWriter refreshWriter) {
 +
 +    KeyExtent extent = tablet.getExtent();
 +
 +    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
 +        .incrementBy(100, MILLISECONDS).maxWait(10, 
SECONDS).backOffFactor(1.5)
 +        .logInterval(3, MINUTES).createRetry();
 +
 +    while (canCommitCompaction(ecid, tablet)) {
 +      ExternalCompactionMetadata ecm = 
tablet.getExternalCompactions().get(ecid);
 +
 +      // the compacted files should not exists in the tablet already
 +      var tablet2 = tablet;
 +      newDatafile.ifPresent(
 +          newFile -> 
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
 +              "File already exists in tablet %s %s", newFile, 
tablet2.getFiles()));
 +
 +      if (tablet.getLocation() != null
 +          && tablet.getExternalCompactions().get(ecid).getKind() != 
CompactionKind.USER) {
 +        // Write the refresh entry before attempting to update tablet 
metadata, this ensures that
 +        // refresh will happen even if this process dies. In the case where 
this process does not
 +        // die refresh will happen after commit. User compactions will make 
refresh calls in their
 +        // fate operation, so it does not need to be done here.
 +        refreshWriter.writeRefresh(tablet.getLocation());
 +      }
 +
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
 +
 +        if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
 +          tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
 +        }
 +
 +        // make the needed updates to the tablet
 +        updateTabletForCompaction(stats, ecid, tablet, newDatafile, extent, 
ecm, tabletMutator);
 +
 +        tabletMutator
 +            .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
 +
 +        // TODO expensive logging
 +        LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
 +            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
 +                .collect(Collectors.toList()));
 +
 +        // ELASTICITY_TODO check return value and retry, could fail because 
of race conditions
 +        var result = tabletsMutator.process().get(extent);
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          // compaction was committed, mark the compaction input files for 
deletion
 +          //
 +          // ELASTICITIY_TODO in the case of process death the GC candidates 
would never be added
 +          // like #3811. If compaction commit were moved to FATE per #3559 
then this would not
 +          // be an issue. If compaction commit is never moved to FATE, then 
this addition could
 +          // moved to the compaction refresh process. The compaction refresh 
process will go away
 +          // if compaction commit is moved to FATE, so should only do this if 
not moving to FATE.
 +          ctx.getAmple().putGcCandidates(extent.tableId(), ecm.getJobFiles());
 +          break;
 +        } else {
 +          // compaction failed to commit, maybe something changed on the 
tablet so lets reread the
 +          // metadata and try again
 +          tablet = result.readMetadata();
 +        }
 +
 +        retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for 
tablet " + extent);
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return tablet;
 +  }
 +
 +  private void updateTabletForCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
 +      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, 
KeyExtent extent,
 +      ExternalCompactionMetadata ecm, Ample.ConditionalTabletMutator 
tabletMutator) {
 +    // ELASTICITY_TODO improve logging adapt to use existing tablet files 
logging
 +    if (ecm.getKind() == CompactionKind.USER) {
 +      if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
 +        // all files selected for the user compactions are finished, so the 
tablet is finish and
 +        // its compaction id needs to be updated.
 +
 +        long fateTxId = tablet.getSelectedFiles().getFateTxId();
 +
 +        Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId),
 +            "Tablet %s unexpected has selected files and compacted columns 
for %s",
 +            tablet.getExtent(), fateTxId);
 +
 +        // TODO set to trace
 +        LOG.debug("All selected files compcated for {} setting compacted for 
{}",
 +            tablet.getExtent(), 
FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId()));
 +
 +        tabletMutator.deleteSelectedFiles();
 +        tabletMutator.putCompacted(fateTxId);
 +
 +      } else {
 +        // not all of the selected files were finished, so need to add the 
new file to the
 +        // selected set
 +
 +        Set<StoredTabletFile> newSelectedFileSet =
 +            new HashSet<>(tablet.getSelectedFiles().getFiles());
 +        newSelectedFileSet.removeAll(ecm.getJobFiles());
 +
 +        if (newDatafile.isPresent()) {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, adding new selected 
file {} from compaction",
 +              tablet.getExtent(), 
newDatafile.orElseThrow().getPath().getName());
 +          newSelectedFileSet.add(newDatafile.orElseThrow().insert());
 +        } else {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, compaction produced no 
output so not adding to selected set.",
 +              tablet.getExtent());
 +        }
 +
 +        tabletMutator.putSelectedFiles(
 +            new SelectedFiles(newSelectedFileSet, 
tablet.getSelectedFiles().initiallySelectedAll(),
 +                tablet.getSelectedFiles().getFateTxId()));
 +      }
 +    }
 +
 +    if (tablet.getLocation() != null) {
 +      // add scan entries to prevent GC in case the hosted tablet is 
currently using the files for
 +      // scan
 +      ecm.getJobFiles().forEach(tabletMutator::putScan);
 +    }
 +    ecm.getJobFiles().forEach(tabletMutator::deleteFile);
 +    tabletMutator.deleteExternalCompaction(ecid);
 +
 +    if (newDatafile.isPresent()) {
 +      tabletMutator.putFile(newDatafile.orElseThrow(),
 +          new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
 +    }
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
 +      TKeyExtent extent) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.info("Compaction failed, id: {}", externalCompactionId);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((ecid, extent) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
 +              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
 +
 +                @Override
 +                public boolean callWhenTabletDoesNotExists() {
 +                  return true;
 +                }
 +
 +                @Override
 +                public boolean test(TabletMetadata tabletMetadata) {
 +                  return tabletMetadata == null
 +                      || 
!tabletMetadata.getExternalCompactions().containsKey(ecid);
 +                }
 +
 +              });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector 
runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            var ecid =
 +                compactions.entrySet().stream().filter(entry -> 
entry.getValue().equals(extent))
 +                    .findFirst().map(Map.Entry::getKey).orElse(null);
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, 
ecid);
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a 
compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. 
This block is
 +          // entered when the conditional mutator above successfully deletes 
an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet 
directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          compactions.entrySet().stream().filter(e -> 
e.getValue().compareTo(extent) == 0)
 +              .map(Entry::getKey).forEach(ecidsForTablet::add);
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = 
ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + 
tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = fs.listStatus(new Path(volPath), 
(path) -> {
 +                      return path.getName().endsWith(fileSuffix);
 +                    });
 +                    if (files.length > 0) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", 
file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", 
file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for 
tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be 
due to a merge or
 +              // split operation. Use the utility to find tmp files at the 
table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.forEach((k, v) -> recordCompletion(k));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long 
timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", 
externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +    }
 +  }
 +
 +  private void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    return 
this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build()
 +        .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
 +        .collect(Collectors.toSet());
 +  }
 +
 +  /**
 +   * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
 +   * method periodically cleans those up.
 +   */
 +  protected void cleanUpRunning() {
 +
 +    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(RUNNING_CACHE.keySet());
 +
 +    // grab the ids that are listed as running in the metadata table. It 
important that this is done
 +    // after getting the snapshot.
 +    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
 +
 +    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getRunningCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getCompletedCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
 +      throws TException {
 +    var runningCompaction = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), 
null,
 +          TableOperation.COMPACT_CANCEL, 
TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected String getTServerAddressString(HostAndPort tserverAddress) {
 +    return ExternalCompactionUtil.getHostPortString(tserverAddress);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, 
externalCompactionId);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void returnTServerClient(TabletServerClientService.Client client) 
{
 +    ThriftUtil.returnClient(client, this.ctx);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race 
condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpCompactors() {
 +    final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 +
 +    var zoorw = this.ctx.getZooReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(compactorQueuesPath);
 +
 +      for (String group : groups) {
 +        String qpath = compactorQueuesPath + "/" + group;
 +
 +        var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +        }
 +
 +        for (String compactor : compactors) {
 +          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
 +          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
 +          if (lockNodes.isEmpty()) {
 +            deleteEmpty(zoorw, cpath);
 +          }
 +        }
 +      }
 +
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 7928b511d5,c6b194e8c7..76e9487708
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -19,15 -19,26 +19,24 @@@
  package org.apache.accumulo.test.functional;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
 -import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 -import static org.junit.jupiter.api.Assertions.fail;
  
  import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.nio.file.FileVisitResult;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.nio.file.SimpleFileVisitor;
+ import java.nio.file.attribute.BasicFileAttributes;
  import java.time.Duration;
 -import java.util.ArrayList;
+ import java.util.Arrays;
  import java.util.EnumSet;
+ import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
@@@ -49,14 -62,19 +58,16 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.admin.CompactionConfig;
  import org.apache.accumulo.core.client.admin.NewTableConfiguration;
  import org.apache.accumulo.core.client.admin.PluginConfig;
 -import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+ import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
  import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 -import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
  import org.apache.accumulo.core.clientImpl.ClientContext;
 -import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.file.rfile.bcfile.PrintBCInfo;
 -import org.apache.accumulo.core.iterators.DevNull;
  import org.apache.accumulo.core.iterators.Filter;
  import org.apache.accumulo.core.iterators.IteratorEnvironment;
  import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;


Reply via email to