This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 40c5f8edbf6a10366d9ce9728fd0a4c7211503c4
Merge: 8a9c10724c 187733f634
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Mon Feb 10 20:49:43 2025 -0500

    Merge branch '3.1'

 .../apache/accumulo/core/clientImpl/TableOperationsImpl.java  |  1 -
 .../src/main/java/org/apache/accumulo/core/data/LoadPlan.java |  2 ++
 .../java/org/apache/accumulo/core/fate/user/FateMutator.java  | 11 ++++++-----
 .../org/apache/accumulo/core/fate/user/UserFateStore.java     |  5 +++--
 .../java/org/apache/accumulo/core/metadata/schema/Ample.java  |  9 +++++----
 .../accumulo/core/util/compaction/CompactionPlanImpl.java     |  5 +----
 .../java/org/apache/accumulo/core/zookeeper/ZooCache.java     | 11 +----------
 .../core/spi/compaction/RatioBasedCompactionPlannerTest.java  |  4 ++--
 .../accumulo/server/compaction/CompactionJobGenerator.java    |  2 +-
 .../org/apache/accumulo/server/util/ServiceStatusCmd.java     |  1 -
 .../apache/accumulo/server/util/checkCommand/CheckRunner.java |  3 ++-
 .../apache/accumulo/test/functional/GracefulShutdownIT.java   |  1 +
 .../apache/accumulo/test/functional/MemoryStarvedScanIT.java  |  3 ++-
 13 files changed, 26 insertions(+), 32 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index ff73e72017,0baf9b7e9e..6acf4758d2
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -105,10 -86,6 +105,9 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.admin.NewTableConfiguration;
  import org.apache.accumulo.core.client.admin.SummaryRetriever;
  import org.apache.accumulo.core.client.admin.TableOperations;
- import 
org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
 +import org.apache.accumulo.core.client.admin.TabletAvailability;
 +import org.apache.accumulo.core.client.admin.TabletInformation;
 +import org.apache.accumulo.core.client.admin.TabletMergeability;
  import org.apache.accumulo.core.client.admin.TimeType;
  import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
  import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
diff --cc core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index ac675d7fb9,0000000000..0e8b4344ed
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@@ -1,103 -1,0 +1,104 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate.user;
 +
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateKey;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.fate.Repo;
- import org.apache.accumulo.core.fate.user.schema.FateSchema;
 +
 +public interface FateMutator<T> {
 +
 +  FateMutator<T> putStatus(TStatus status);
 +
 +  FateMutator<T> putKey(FateKey fateKey);
 +
 +  FateMutator<T> putCreateTime(long ctime);
 +
 +  /**
 +   * Requires that nothing exists for this fate mutation.
 +   */
 +  FateMutator<T> requireAbsent();
 +
 +  /**
 +   * Require that the transaction status is one of the given statuses. If no 
statuses are provided,
 +   * require that the status column is absent.
 +   *
 +   * @param statuses The statuses to check against.
 +   */
 +  FateMutator<T> requireStatus(TStatus... statuses);
 +
 +  /**
 +   * Require the transaction has no reservation.
 +   */
 +  FateMutator<T> requireUnreserved();
 +
 +  /**
 +   * Require the transaction has no fate key set.
 +   */
 +  FateMutator<T> requireAbsentKey();
 +
 +  /**
-    * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
-    * put the reservation if there is not already a reservation present
++   * Add a conditional mutation to
++   * {@link 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
++   * that will put the reservation if there is not already a reservation 
present
 +   *
 +   * @param reservation the reservation to attempt to put
 +   * @return the FateMutator with this added mutation
 +   */
 +  FateMutator<T> putReservedTx(FateStore.FateReservation reservation);
 +
 +  /**
-    * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
-    * delete the column if the column value matches the given reservation
++   * Add a conditional mutation to
++   * {@link 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
++   * that will delete the column if the column value matches the given 
reservation
 +   *
 +   * @param reservation the reservation to attempt to remove
 +   * @return the FateMutator with this added mutation
 +   */
 +  FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation);
 +
 +  FateMutator<T> putName(byte[] data);
 +
 +  FateMutator<T> putAutoClean(byte[] data);
 +
 +  FateMutator<T> putException(byte[] data);
 +
 +  FateMutator<T> putReturnValue(byte[] data);
 +
 +  FateMutator<T> putAgeOff(byte[] data);
 +
 +  FateMutator<T> putTxInfo(Fate.TxInfo txInfo, byte[] data);
 +
 +  FateMutator<T> putRepo(int position, Repo<T> repo);
 +
 +  FateMutator<T> deleteRepo(int position);
 +
 +  void mutate();
 +
 +  // This exists to represent the subset of statuses from 
ConditionalWriter.Status that are expected
 +  // and need to be handled.
 +  enum Status {
 +    ACCEPTED, REJECTED, UNKNOWN
 +  }
 +
 +  Status tryMutate();
 +
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 977adb2440,0000000000..8268163dad
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@@ -1,555 -1,0 +1,556 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate.user;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.EnumSet;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.SortedMap;
 +import java.util.UUID;
 +import java.util.function.Function;
 +import java.util.function.Predicate;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.fate.AbstractFateStore;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.FateKey;
 +import org.apache.accumulo.core.fate.ReadOnlyRepo;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.core.fate.StackOverflowException;
 +import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
 +import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
 +import 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.ColumnFQ;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +
 +public class UserFateStore<T> extends AbstractFateStore<T> {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(UserFateStore.class);
 +
 +  private final ClientContext context;
 +  private final String tableName;
 +
 +  private static final FateInstanceType fateInstanceType = 
FateInstanceType.USER;
 +  private static final com.google.common.collect.Range<Integer> REPO_RANGE =
 +      com.google.common.collect.Range.closed(1, MAX_REPOS);
 +
 +  /**
 +   * Constructs a UserFateStore
 +   *
 +   * @param context the {@link ClientContext}
 +   * @param tableName the name of the table which will store the Fate data
-    * @param lockID the {@link ZooUtil.LockID} held by the process creating 
this store. Should be
-    *        null if this store will be used as read-only (will not be used to 
reserve transactions)
++   * @param lockID the {@link 
org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID} held by the
++   *        process creating this store. Should be null if this store will be 
used as read-only
++   *        (will not be used to reserve transactions)
 +   * @param isLockHeld the {@link Predicate} used to determine if the lockID 
is held or not at the
 +   *        time of invocation. If the store is used for a {@link Fate} which 
runs a dead
 +   *        reservation cleaner, this should be non-null, otherwise null is 
fine
 +   */
 +  public UserFateStore(ClientContext context, String tableName, 
ZooUtil.LockID lockID,
 +      Predicate<ZooUtil.LockID> isLockHeld) {
 +    this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, 
DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  @VisibleForTesting
 +  public UserFateStore(ClientContext context, String tableName, 
ZooUtil.LockID lockID,
 +      Predicate<ZooUtil.LockID> isLockHeld, int maxDeferred, FateIdGenerator 
fateIdGenerator) {
 +    super(lockID, isLockHeld, maxDeferred, fateIdGenerator);
 +    this.context = Objects.requireNonNull(context);
 +    this.tableName = Objects.requireNonNull(tableName);
 +  }
 +
 +  @Override
 +  public FateId create() {
 +
 +    int attempt = 0;
 +    while (true) {
 +
 +      FateId fateId = getFateId();
 +
 +      if (attempt >= 1) {
 +        log.debug("Failed to create new id: {}, trying again", fateId);
 +        UtilWaitThread.sleep(100);
 +      }
 +
 +      var status = newMutator(fateId).requireAbsent().putStatus(TStatus.NEW)
 +          .putCreateTime(System.currentTimeMillis()).tryMutate();
 +
 +      switch (status) {
 +        case ACCEPTED:
 +          return fateId;
 +        case UNKNOWN:
 +        case REJECTED:
 +          attempt++;
 +          continue;
 +        default:
 +          throw new IllegalStateException("Unknown status " + status);
 +      }
 +    }
 +  }
 +
 +  public FateId getFateId() {
 +    return fateIdGenerator.newRandomId(type());
 +  }
 +
 +  @Override
 +  public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey 
fateKey, Repo<T> repo,
 +      boolean autoCleanUp) {
 +    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
 +    Supplier<FateMutator<T>> mutatorFactory = () -> 
newMutator(fateId).requireAbsent()
 +        .putKey(fateKey).putCreateTime(System.currentTimeMillis());
 +    if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo, 
autoCleanUp)) {
 +      return Optional.of(fateId);
 +    } else {
 +      return Optional.empty();
 +    }
 +  }
 +
 +  @Override
 +  public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, 
Repo<T> repo,
 +      boolean autoCleanUp) {
 +    Supplier<FateMutator<T>> mutatorFactory =
 +        () -> 
newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey();
 +    return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, 
autoCleanUp);
 +  }
 +
 +  private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, 
String logId,
 +      Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp) {
 +    int maxAttempts = 5;
 +    for (int attempt = 0; attempt < maxAttempts; attempt++) {
 +      var mutator = mutatorFactory.get();
 +      mutator =
 +          mutator.putName(serializeTxInfo(txName)).putRepo(1, 
repo).putStatus(TStatus.SUBMITTED);
 +      if (autoCleanUp) {
 +        mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
 +      }
 +      var status = mutator.tryMutate();
 +      if (status == FateMutator.Status.ACCEPTED) {
 +        // signal to the super class that a new fate transaction was seeded 
and is ready to run
 +        seededTx();
 +        log.trace("Attempt to seed {} returned {}", logId, status);
 +        return true;
 +      } else if (status == FateMutator.Status.REJECTED) {
 +        log.debug("Attempt to seed {} returned {}", logId, status);
 +        return false;
 +      } else if (status == FateMutator.Status.UNKNOWN) {
 +        // At this point can not reliably determine if the conditional 
mutation was successful or
 +        // not because no reservation was acquired. For example since no 
reservation was acquired it
 +        // is possible that seeding was a success and something immediately 
picked it up and started
 +        // operating on it and changing it. If scanning after that point can 
not conclude success or
 +        // failure. Another situation is that maybe the fateId already 
existed in a seeded form
 +        // prior to getting this unknown.
 +        log.debug("Attempt to seed {} returned {} status, retrying", logId, 
status);
 +        UtilWaitThread.sleep(250);
 +      }
 +    }
 +
 +    log.warn("Repeatedly received unknown status when attempting to seed {}", 
logId);
 +    return false;
 +  }
 +
 +  @Override
 +  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
 +    verifyLock(lockID, fateId);
 +    // Create a unique FateReservation for this reservation attempt
 +    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
 +
 +    // requiring any status prevents creating an entry if the fate id doesn't 
exist
 +    FateMutator.Status status =
 +        
newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate();
 +    if (status.equals(FateMutator.Status.ACCEPTED)) {
 +      return Optional.of(new FateTxStoreImpl(fateId, reservation));
 +    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
 +      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
 +      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
 +      // need to check if the mutation was written and if it was written by 
this
 +      // attempt at reservation. If it was written by this reservation 
attempt,
 +      // we can return the FateTxStore since it was successfully reserved in 
this
 +      // attempt, otherwise we return empty (was written by another 
reservation
 +      // attempt or was not written at all).
 +      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
 +        scanner.setRange(getRow(fateId));
 +        
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
 +            TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
 +        FateReservation persistedRes =
 +            scanner.stream().map(entry -> 
FateReservation.deserialize(entry.getValue().get()))
 +                .findFirst().orElse(null);
 +        if (persistedRes != null && persistedRes.equals(reservation)) {
 +          return Optional.of(new FateTxStoreImpl(fateId, reservation));
 +        }
 +      } catch (TableNotFoundException e) {
 +        throw new IllegalStateException(tableName + " not found!", e);
 +      }
 +    }
 +    return Optional.empty();
 +  }
 +
 +  @Override
 +  public void deleteDeadReservations() {
 +    for (Entry<FateId,FateReservation> activeRes : 
getActiveReservations().entrySet()) {
 +      FateId fateId = activeRes.getKey();
 +      FateReservation reservation = activeRes.getValue();
 +      if (!isLockHeld.test(reservation.getLockID())) {
 +        var status = 
newMutator(fateId).putUnreserveTx(reservation).tryMutate();
 +        if (status == FateMutator.Status.ACCEPTED) {
 +          // Technically, this should also be logged for the case where the 
mutation status
 +          // is UNKNOWN, but the mutation was actually written (fate id was 
unreserved)
 +          // but there is no way to tell if it was unreserved from this 
mutation or another
 +          // thread simply unreserving the transaction
 +          log.trace("Deleted the dead reservation {} for fate id {}", 
reservation, fateId);
 +        }
 +        // No need to verify the status... If it is ACCEPTED, we have 
successfully unreserved
 +        // the dead transaction. If it is REJECTED, the reservation has 
changed (i.e.,
 +        // has been unreserved so no need to do anything, or has been 
unreserved and reserved
 +        // again in which case we don't want to change it). If it is UNKNOWN, 
the mutation
 +        // may or may not have been written. If it was written, we have 
successfully unreserved
 +        // the dead transaction. If it was not written, the next cycle/call to
 +        // deleteDeadReservations() will try again.
 +      }
 +    }
 +  }
 +
 +  @Override
 +  protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) {
 +    try {
 +      Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY);
 +      scanner.setRange(new Range());
 +      RowFateStatusFilter.configureScanner(scanner, statuses);
 +      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
 +      TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
 +      return scanner.stream().onClose(scanner::close).map(e -> {
 +        String txUUIDStr = e.getKey().getRow().toString();
 +        FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
 +        SortedMap<Key,Value> rowMap;
 +        TStatus status = TStatus.UNKNOWN;
 +        FateReservation reservation = null;
 +        try {
 +          rowMap = WholeRowIterator.decodeRow(e.getKey(), e.getValue());
 +        } catch (IOException ex) {
 +          throw new RuntimeException(ex);
 +        }
 +        // expect status and optionally reservation
 +        Preconditions.checkState(rowMap.size() == 1 || rowMap.size() == 2,
 +            "Invalid row seen: %s. Expected to see one entry for the status 
and optionally an "
 +                + "entry for the fate reservation",
 +            rowMap);
 +        for (Entry<Key,Value> entry : rowMap.entrySet()) {
 +          Text colf = entry.getKey().getColumnFamily();
 +          Text colq = entry.getKey().getColumnQualifier();
 +          Value val = entry.getValue();
 +          switch (colq.toString()) {
 +            case TxColumnFamily.STATUS:
 +              status = TStatus.valueOf(val.toString());
 +              break;
 +            case TxColumnFamily.RESERVATION:
 +              reservation = FateReservation.deserialize(val.get());
 +              break;
 +            default:
 +              throw new IllegalStateException("Unexpected column seen: " + 
colf + ":" + colq);
 +          }
 +        }
 +        final TStatus finalStatus = status;
 +        final Optional<FateReservation> finalReservation = 
Optional.ofNullable(reservation);
 +        return new FateIdStatusBase(fateId) {
 +          @Override
 +          public TStatus getStatus() {
 +            return finalStatus;
 +          }
 +
 +          @Override
 +          public Optional<FateReservation> getFateReservation() {
 +            return finalReservation;
 +          }
 +        };
 +      });
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException(tableName + " not found!", e);
 +    }
 +  }
 +
 +  @Override
 +  public Stream<FateKey> list(FateKey.FateKeyType type) {
 +    try {
 +      Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY);
 +      scanner.setRange(new Range());
 +      TxColumnFamily.TX_KEY_COLUMN.fetch(scanner);
 +      FateKeyFilter.configureScanner(scanner, type);
 +      return scanner.stream().onClose(scanner::close)
 +          .map(e -> FateKey.deserialize(e.getValue().get()));
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException(tableName + " not found!", e);
 +    }
 +  }
 +
 +  @Override
 +  protected TStatus _getStatus(FateId fateId) {
 +    return scanTx(scanner -> {
 +      scanner.setRange(getRow(fateId));
 +      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
 +      return scanner.stream().map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst()
 +          .orElse(TStatus.UNKNOWN);
 +    });
 +  }
 +
 +  @Override
 +  protected Optional<FateKey> getKey(FateId fateId) {
 +    return scanTx(scanner -> {
 +      scanner.setRange(getRow(fateId));
 +      TxColumnFamily.TX_KEY_COLUMN.fetch(scanner);
 +      return scanner.stream().map(e -> 
FateKey.deserialize(e.getValue().get())).findFirst();
 +    });
 +  }
 +
 +  @Override
 +  protected FateTxStore<T> newUnreservedFateTxStore(FateId fateId) {
 +    return new FateTxStoreImpl(fateId);
 +  }
 +
 +  static Range getRow(FateId fateId) {
 +    return new Range(getRowId(fateId));
 +  }
 +
 +  public static String getRowId(FateId fateId) {
 +    return fateId.getTxUUIDStr();
 +  }
 +
 +  private FateMutatorImpl<T> newMutator(FateId fateId) {
 +    return new FateMutatorImpl<>(context, tableName, fateId);
 +  }
 +
 +  private <R> R scanTx(Function<Scanner,R> func) {
 +    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
 +      return func.apply(scanner);
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException(tableName + " not found!", e);
 +    }
 +  }
 +
 +  @Override
 +  public FateInstanceType type() {
 +    return fateInstanceType;
 +  }
 +
 +  private class FateTxStoreImpl extends AbstractFateTxStoreImpl {
 +
 +    private FateTxStoreImpl(FateId fateId) {
 +      super(fateId);
 +    }
 +
 +    private FateTxStoreImpl(FateId fateId, FateReservation reservation) {
 +      super(fateId, reservation);
 +    }
 +
 +    @Override
 +    public Repo<T> top() {
 +      verifyReservedAndNotDeleted(false);
 +
 +      return scanTx(scanner -> {
 +        scanner.setRange(getRow(fateId));
 +        scanner.setBatchSize(1);
 +        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
 +        return scanner.stream().map(e -> {
 +          @SuppressWarnings("unchecked")
 +          var repo = (Repo<T>) deserialize(e.getValue().get());
 +          return repo;
 +        }).findFirst().orElse(null);
 +      });
 +    }
 +
 +    @Override
 +    public List<ReadOnlyRepo<T>> getStack() {
 +      verifyReservedAndNotDeleted(false);
 +
 +      return scanTx(scanner -> {
 +        scanner.setRange(getRow(fateId));
 +        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
 +        return scanner.stream().map(e -> {
 +          @SuppressWarnings("unchecked")
 +          var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
 +          return repo;
 +        }).collect(Collectors.toList());
 +      });
 +    }
 +
 +    @Override
 +    public Serializable getTransactionInfo(TxInfo txInfo) {
 +      verifyReservedAndNotDeleted(false);
 +
 +      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
 +        scanner.setRange(getRow(fateId));
 +
 +        final ColumnFQ cq;
 +        switch (txInfo) {
 +          case TX_NAME:
 +            cq = TxInfoColumnFamily.TX_NAME_COLUMN;
 +            break;
 +          case AUTO_CLEAN:
 +            cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
 +            break;
 +          case EXCEPTION:
 +            cq = TxInfoColumnFamily.EXCEPTION_COLUMN;
 +            break;
 +          case RETURN_VALUE:
 +            cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN;
 +            break;
 +          case TX_AGEOFF:
 +            cq = TxInfoColumnFamily.TX_AGEOFF_COLUMN;
 +            break;
 +          default:
 +            throw new IllegalArgumentException("Unexpected TxInfo type " + 
txInfo);
 +        }
 +        scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());
 +
 +        return scanner.stream().map(e -> deserializeTxInfo(txInfo, 
e.getValue().get())).findFirst()
 +            .orElse(null);
 +      } catch (TableNotFoundException e) {
 +        throw new IllegalStateException(tableName + " not found!", e);
 +      }
 +    }
 +
 +    @Override
 +    public long timeCreated() {
 +      verifyReservedAndNotDeleted(false);
 +
 +      return scanTx(scanner -> {
 +        scanner.setRange(getRow(fateId));
 +        TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner);
 +        return scanner.stream().map(e -> 
Long.parseLong(e.getValue().toString())).findFirst()
 +            .orElse(0L);
 +      });
 +    }
 +
 +    @Override
 +    public void push(Repo<T> repo) throws StackOverflowException {
 +      verifyReservedAndNotDeleted(true);
 +
 +      Optional<Integer> top = findTop();
 +
 +      if (top.filter(t -> t >= MAX_REPOS).isPresent()) {
 +        throw new StackOverflowException("Repo stack size too large");
 +      }
 +
 +      FateMutator<T> fateMutator =
 +          
newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new));
 +      fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
 +    }
 +
 +    @Override
 +    public void pop() {
 +      verifyReservedAndNotDeleted(true);
 +
 +      Optional<Integer> top = findTop();
 +      top.ifPresent(t -> 
newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new))
 +          .deleteRepo(t).mutate());
 +    }
 +
 +    @Override
 +    public void setStatus(TStatus status) {
 +      verifyReservedAndNotDeleted(true);
 +
 +      newMutator(fateId).putStatus(status).mutate();
 +      observedStatus = status;
 +    }
 +
 +    @Override
 +    public void setTransactionInfo(TxInfo txInfo, Serializable so) {
 +      verifyReservedAndNotDeleted(true);
 +
 +      final byte[] serialized = serializeTxInfo(so);
 +
 +      newMutator(fateId).putTxInfo(txInfo, serialized).mutate();
 +    }
 +
 +    @Override
 +    public void delete() {
 +      verifyReservedAndNotDeleted(true);
 +
 +      var mutator = newMutator(fateId);
 +      mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new));
 +      mutator.delete().mutate();
 +      this.deleted = true;
 +    }
 +
 +    @Override
 +    public void forceDelete() {
 +      verifyReservedAndNotDeleted(true);
 +
 +      var mutator = newMutator(fateId);
 +      mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new));
 +      mutator.delete().mutate();
 +      this.deleted = true;
 +    }
 +
 +    private Optional<Integer> findTop() {
 +      return scanTx(scanner -> {
 +        scanner.setRange(getRow(fateId));
 +        scanner.setBatchSize(1);
 +        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
 +        return scanner.stream().map(e -> 
restoreRepo(e.getKey().getColumnQualifier())).findFirst();
 +      });
 +    }
 +
 +    @Override
 +    protected void unreserve() {
 +      if (!deleted) {
 +        FateMutator.Status status;
 +        do {
 +          status = newMutator(fateId).putUnreserveTx(reservation).tryMutate();
 +        } while (status.equals(FateMutator.Status.UNKNOWN));
 +      }
 +      reservation = null;
 +    }
 +  }
 +
 +  static Text invertRepo(int position) {
 +    Preconditions.checkArgument(REPO_RANGE.contains(position),
 +        "Position %s is not in the valid range of [0,%s]", position, 
MAX_REPOS);
 +    return new Text(String.format("%02d", MAX_REPOS - position));
 +  }
 +
 +  static Integer restoreRepo(Text invertedPosition) {
 +    int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString());
 +    Preconditions.checkArgument(REPO_RANGE.contains(position),
 +        "Position %s is not in the valid range of [0,%s]", position, 
MAX_REPOS);
 +    return position;
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index cf2ea3bb82,b212356a4a..c8e5ae33be
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@@ -256,59 -261,6 +256,60 @@@ public interface Ample 
      void close();
    }
  
 +  interface ConditionalResult {
 +
 +    /**
-      * This enum was created instead of using {@link 
ConditionalWriter.Status} because Ample has
-      * automated handling for most of the statuses of the conditional writer 
and therefore only a
-      * subset are expected to be passed out of Ample. This enum represents 
the subset that Ample
-      * will actually return.
++     * This enum was created instead of using
++     * {@link org.apache.accumulo.core.client.ConditionalWriter.Status} 
because Ample has automated
++     * handling for most of the statuses of the conditional writer and 
therefore only a subset are
++     * expected to be passed out of Ample. This enum represents the subset 
that Ample will actually
++     * return.
 +     */
 +    enum Status {
 +      ACCEPTED, REJECTED
 +    }
 +
 +    /**
 +     * Returns the status of the conditional mutation or may return a 
computed status of ACCEPTED in
 +     * some cases, see {@link 
ConditionalTabletMutator#submit(RejectionHandler)} for details.
 +     */
 +    Status getStatus();
 +
 +    KeyExtent getExtent();
 +
 +    /**
 +     * This can only be called when {@link #getStatus()} returns something 
other than
 +     * {@link Status#ACCEPTED}. It reads that tablets metadata for a failed 
conditional mutation.
 +     * This can be used to see why it was not accepted.
 +     */
 +    TabletMetadata readMetadata();
 +  }
 +
 +  interface AsyncConditionalTabletsMutator extends AutoCloseable {
 +    /**
 +     * @return A fluent interface to conditional mutating a tablet. Ensure 
you call
 +     *         {@link ConditionalTabletMutator#submit(RejectionHandler)} when 
finished.
 +     */
 +    OperationRequirements mutateTablet(KeyExtent extent);
 +
 +    /**
 +     * Closing ensures that all mutations are processed and their results are 
reported.
 +     */
 +    @Override
 +    void close();
 +  }
 +
 +  interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator {
 +
 +    /**
 +     * After creating one or more conditional mutations using {@link 
#mutateTablet(KeyExtent)}, call
 +     * this method to process them using a {@link ConditionalWriter}
 +     *
 +     * @return The result from the {@link ConditionalWriter} of processing 
each tablet.
 +     */
 +    Map<KeyExtent,ConditionalResult> process();
 +  }
 +
    /**
     * Interface for changing a tablets persistent data.
     */
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
index 49b8eaa53c,0f26f9492b..32b23bdb20
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
@@@ -55,14 -56,14 +55,11 @@@ public class CompactionPlanImpl impleme
  
      private final CompactionKind kind;
      private final ArrayList<CompactionJob> jobs = new ArrayList<>();
--    private final Set<CompactableFile> allFiles;
      private final Set<CompactableFile> seenFiles = new HashSet<>();
      private final Set<CompactableFile> candidates;
  
--    public BuilderImpl(CompactionKind kind, Set<CompactableFile> allFiles,
--        Set<CompactableFile> candidates) {
++    public BuilderImpl(CompactionKind kind, Set<CompactableFile> candidates) {
        this.kind = kind;
--      this.allFiles = allFiles;
        this.candidates = candidates;
      }
  
diff --cc core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
index ff30277797,0000000000..791e8d85bc
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
@@@ -1,589 -1,0 +1,580 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.zookeeper;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static java.util.Objects.requireNonNull;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 +
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.NavigableSet;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.TreeSet;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Consumer;
 +import java.util.function.Predicate;
 +
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
 +import org.apache.accumulo.core.util.cache.Caches;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.Code;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.Ticker;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * A cache for values stored in ZooKeeper. Values are kept up to date as they 
change.
 + */
 +public class ZooCache {
 +
 +  public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}
 +
 +  private static final Logger log = LoggerFactory.getLogger(ZooCache.class);
 +
 +  private final NavigableSet<String> watchedPaths;
 +
 +  // visible for tests
 +  protected final ZCacheWatcher watcher = new ZCacheWatcher();
 +  private final List<ZooCacheWatcher> externalWatchers =
 +      Collections.synchronizedList(new ArrayList<>());
 +
 +  private static final AtomicLong nextCacheId = new AtomicLong(0);
 +  private final String cacheId = "ZC" + nextCacheId.incrementAndGet();
 +
 +  public static final Duration CACHE_DURATION = Duration.ofMinutes(30);
 +
 +  private final Cache<String,ZcNode> cache;
 +
 +  private final ConcurrentMap<String,ZcNode> nodeCache;
 +
 +  private final ZooSession zk;
 +
 +  private volatile boolean closed = false;
 +
 +  private final AtomicLong updateCount = new AtomicLong();
 +
 +  private final AtomicLong zkClientTracker = new AtomicLong();
 +
 +  class ZCacheWatcher implements Watcher {
 +    @Override
 +    public void process(WatchedEvent event) {
 +      if (log.isTraceEnabled()) {
 +        log.trace("{}: {}", cacheId, event);
 +      }
 +
 +      switch (event.getType()) {
 +        case NodeChildrenChanged:
 +          // According to documentation we should not receive this event.
 +          // According to 
https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we
 +          // may receive this event (Fixed in 3.9.0)
 +          break;
 +        case ChildWatchRemoved:
 +        case DataWatchRemoved:
 +          // We don't need to do anything with the cache on these events.
 +          break;
 +        case NodeDataChanged:
 +          log.trace("{} node data changed; clearing {}", cacheId, 
event.getPath());
 +          clear(path -> path.equals(event.getPath()));
 +          break;
 +        case NodeCreated:
 +        case NodeDeleted:
 +          // With the Watcher being set at a higher level we need to remove
 +          // the parent of the affected node and all of its children from the 
cache
 +          // so that the parent and children node can be re-cached. If we 
only remove the
 +          // affected node, then the cached children in the parent could be 
incorrect.
 +          int lastSlash = event.getPath().lastIndexOf('/');
 +          String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, 
lastSlash);
 +          log.trace("{} node created or deleted {}; clearing {}", cacheId, 
event.getPath(), parent);
 +          clear((path) -> path.startsWith(parent));
 +          break;
 +        case PersistentWatchRemoved:
 +          log.warn(
 +              "{} persistent watch removed {} which is only done in 
ZooSession.addPersistentRecursiveWatchers; ignoring;",
 +              cacheId, event.getPath());
 +          break;
 +        case None:
 +          switch (event.getState()) {
 +            case Closed:
 +              log.trace("{} ZooKeeper connection closed, clearing cache; {}", 
cacheId, event);
 +              clear();
 +              break;
 +            case Disconnected:
 +              log.trace("{} ZooKeeper connection disconnected, clearing 
cache; {}", cacheId, event);
 +              clear();
 +              break;
 +            case SyncConnected:
 +              log.trace("{} ZooKeeper connection established, ignoring; {}", 
cacheId, event);
 +              break;
 +            case Expired:
 +              log.trace("{} ZooKeeper connection expired, clearing cache; 
{}", cacheId, event);
 +              clear();
 +              break;
 +            default:
 +              log.warn("{} Unhandled state {}", cacheId, event);
 +              break;
 +          }
 +          break;
 +        default:
 +          log.warn("{} Unhandled event type {}", cacheId, event);
 +          break;
 +      }
 +
 +      externalWatchers.forEach(ew -> {
 +        try {
 +          ew.accept(event);
 +        } catch (Exception e) {
 +          log.error(
 +              "Exception calling external watcher. This is a bug and could 
impact proper operation.",
 +              e);
 +        }
 +      });
 +    }
 +  }
 +
 +  /**
 +   * Creates a ZooCache instance that uses the supplied ZooSession for 
communicating with the
 +   * instance's ZooKeeper servers. The ZooCache will create persistent 
watchers at the given
 +   * pathsToWatch, if any, to be updated when changes are made in ZooKeeper 
for nodes at or below in
 +   * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, 
then they will be
 +   * notified when this object is notified of changes via the 
PersistentWatcher callback.
 +   *
 +   * @param zk ZooSession for this instance
 +   * @param pathsToWatch Paths in ZooKeeper to watch
 +   */
 +  public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
 +    this(zk, pathsToWatch, Ticker.systemTicker());
 +  }
 +
 +  // visible for tests that use a Ticker
 +  public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
 +    this.zk = requireNonNull(zk);
 +    // this initial value is meant to indicate watchers were never setup
 +    this.zkClientTracker.set(-1);
 +    this.cache = 
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
 +        
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
 +    // The concurrent map returned by Caffeine will only allow one thread to 
run at a time for a
 +    // given key and ZooCache relies on that. Not all concurrent map 
implementations have this
 +    // behavior for their compute functions.
 +    this.nodeCache = cache.asMap();
 +    this.watchedPaths = Collections.unmodifiableNavigableSet(new 
TreeSet<>(pathsToWatch));
 +    setupWatchers();
 +    log.trace("{} created new cache watching {}", cacheId, pathsToWatch, new 
Exception());
 +  }
 +
 +  public void addZooCacheWatcher(ZooCacheWatcher watcher) {
 +    externalWatchers.add(requireNonNull(watcher));
 +  }
 +
 +  // visible for tests
 +  long getZKClientObjectVersion() {
 +    long counter = zk.getConnectionCounter();
 +    // -1 is used to signify ZK has not been setup in this code and this code 
assume ZooSession will
 +    // always return something >= 0.
 +    Preconditions.checkState(counter >= 0);
 +    return counter;
 +  }
 +
 +  /**
 +   * @return true if ZK has changed; false otherwise
 +   */
 +  private boolean handleZKConnectionChange() {
 +    final long currentCount = getZKClientObjectVersion();
 +    final long oldCount = zkClientTracker.get();
 +    if (oldCount != currentCount) {
 +      return setupWatchers();
 +    }
 +    return false;
 +  }
 +
 +  // Called on construction and when ZooKeeper connection changes
 +  synchronized boolean setupWatchers() {
 +
 +    final long currentCount = getZKClientObjectVersion();
 +    final long oldCount = zkClientTracker.get();
 +
 +    if (currentCount == oldCount) {
 +      return false;
 +    }
 +
 +    for (String left : watchedPaths) {
 +      for (String right : watchedPaths) {
 +        if (!left.equals(right) && left.contains(right)) {
 +          throw new IllegalArgumentException(
 +              "Overlapping paths found in paths to watch. left: " + left + ", 
right: " + right);
 +        }
 +      }
 +    }
 +
 +    try {
 +      long zkId = zk.addPersistentRecursiveWatchers(watchedPaths, watcher);
 +      zkClientTracker.set(zkId);
 +      clear();
 +      log.trace("{} Reinitialized persistent watchers and cleared cache {}", 
cacheId,
 +          zkClientTracker.get());
 +      return true;
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new RuntimeException("Error setting up persistent recursive 
watcher", e);
 +    }
 +
 +  }
 +
 +  private boolean isWatchedPath(String path) {
 +    // Check that the path is equal to, or a descendant of, a watched path
 +    var floor = watchedPaths.floor(path);
 +    return floor != null
 +        && (floor.equals("/") || floor.equals(path) || path.startsWith(floor 
+ "/"));
 +  }
 +
 +  private void ensureWatched(String path) {
 +    if (!isWatchedPath(path)) {
 +      throw new IllegalStateException("Supplied path " + path + " is not 
watched by this ZooCache");
 +    }
 +  }
 +
 +  private abstract class ZooRunnable<T> {
 +    /**
 +     * Runs an operation against ZooKeeper. Retries are performed by the 
retry method when
 +     * KeeperExceptions occur.
 +     *
-      * Changes were made in ACCUMULO-4388 so that the run method no longer 
accepts Zookeeper as an
-      * argument, and instead relies on the ZooRunnable implementation to call
-      * {@link #getZooKeeper()}. Performing the call to retrieving a ZooKeeper 
Session after caches
-      * are checked has the benefit of limiting ZK connections and blocking as 
a result of obtaining
-      * these sessions.
-      *
 +     * @return T the result of the runnable
 +     */
 +    abstract T run() throws KeeperException, InterruptedException;
 +
 +    /**
-      * Retry will attempt to call the run method. Run should make a call to 
{@link #getZooKeeper()}
-      * after checks to cached information are made. This change, per 
ACCUMULO-4388 ensures that we
-      * don't create a ZooKeeper session when information is cached, and 
access to ZooKeeper is
-      * unnecessary.
++     * Retry will attempt to call the run method.
 +     *
 +     * @return result of the runnable access success ( i.e. no exceptions ).
 +     */
 +    public T retry() {
 +
 +      int sleepTime = 100;
 +
 +      while (true) {
 +
 +        try {
 +          T result = run();
 +          if (handleZKConnectionChange()) {
 +            continue;
 +          }
 +          return result;
 +        } catch (KeeperException | ZcException e) {
 +          KeeperException ke;
 +          if (e instanceof ZcException) {
 +            ke = ((ZcException) e).getZKException();
 +          } else {
 +            ke = ((KeeperException) e);
 +          }
 +          final Code code = ke.code();
 +          if (code == Code.NONODE) {
 +            log.error("Looked up non-existent node in cache " + ke.getPath(), 
e);
 +          } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT
 +              || code == Code.SESSIONEXPIRED) {
 +            log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry",
 +                e);
 +          } else {
 +            log.warn("Zookeeper error, will retry", e);
 +          }
 +        } catch (InterruptedException | ZcInterruptedException e) {
 +          log.info("Zookeeper error, will retry", e);
 +        }
 +
 +        try {
 +          // do not hold lock while sleeping
 +          Thread.sleep(sleepTime);
 +        } catch (InterruptedException e) {
 +          log.debug("Wait in retry() was interrupted.", e);
 +        }
 +        if (sleepTime < 10_000) {
 +          sleepTime = (int) (sleepTime + sleepTime * 
RANDOM.get().nextDouble());
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  private static class ZcException extends RuntimeException {
 +    private static final long serialVersionUID = 1;
 +
 +    private ZcException(KeeperException e) {
 +      super(e);
 +    }
 +
 +    public KeeperException getZKException() {
 +      return (KeeperException) getCause();
 +    }
 +  }
 +
 +  private static class ZcInterruptedException extends RuntimeException {
 +    private static final long serialVersionUID = 1;
 +
 +    private ZcInterruptedException(InterruptedException e) {
 +      super(e);
 +    }
 +  }
 +
 +  /**
 +   * Gets the children of the given node.
 +   *
 +   * @param zPath path of node
 +   * @return children list, or null if node has no children or does not exist
 +   */
 +  public List<String> getChildren(final String zPath) {
 +    Preconditions.checkState(!closed);
 +    ensureWatched(zPath);
 +    ZooRunnable<List<String>> zr = new ZooRunnable<>() {
 +
 +      @Override
 +      public List<String> run() throws KeeperException, InterruptedException {
 +
 +        var zcNode = nodeCache.get(zPath);
 +        if (zcNode != null && zcNode.cachedChildren()) {
 +          return zcNode.getChildren();
 +        }
 +
 +        log.trace("{} {} was not in children cache, looking up in zookeeper", 
cacheId, zPath);
 +
 +        zcNode = nodeCache.compute(zPath, (zp, zcn) -> {
 +          // recheck the children now that lock is held on key
 +          if (zcn != null && zcn.cachedChildren()) {
 +            return zcn;
 +          }
 +          try {
 +            List<String> children = zk.getChildren(zPath, null);
 +            log.trace("{} adding {} children of {} to cache", cacheId, 
children.size(), zPath);
 +            return new ZcNode(children, zcn);
 +          } catch (KeeperException.NoNodeException nne) {
 +            log.trace("{} getChildren saw that {} does not exists", cacheId, 
zPath);
 +            return ZcNode.NON_EXISTENT;
 +          } catch (KeeperException e) {
 +            throw new ZcException(e);
 +          } catch (InterruptedException e) {
 +            throw new ZcInterruptedException(e);
 +          }
 +        });
 +        // increment this after compute call completes when the change is 
visible
 +        updateCount.incrementAndGet();
 +        return zcNode.getChildren();
 +      }
 +    };
 +
 +    return zr.retry();
 +  }
 +
 +  /**
 +   * Gets data at the given path. Status information is not returned.
 +   *
 +   * @param zPath path to get
 +   * @return path data, or null if non-existent
 +   */
 +  public byte[] get(final String zPath) {
 +    return get(zPath, null);
 +  }
 +
 +  /**
 +   * Gets data at the given path, filling status information into the given 
<code>Stat</code>
 +   * object.
 +   *
 +   * @param zPath path to get
 +   * @param status status object to populate
 +   * @return path data, or null if non-existent
 +   */
 +  public byte[] get(final String zPath, final ZcStat status) {
 +    Preconditions.checkState(!closed);
 +    ensureWatched(zPath);
 +    ZooRunnable<byte[]> zr = new ZooRunnable<>() {
 +
 +      @Override
 +      public byte[] run() throws KeeperException, InterruptedException {
 +
 +        var zcNode = nodeCache.get(zPath);
 +        if (zcNode != null && zcNode.cachedData()) {
 +          if (status != null) {
 +            copyStats(status, zcNode.getStat());
 +          }
 +          return zcNode.getData();
 +        }
 +
 +        log.trace("{} {} was not in data cache, looking up in zookeeper", 
cacheId, zPath);
 +
 +        zcNode = nodeCache.compute(zPath, (zp, zcn) -> {
 +          // recheck the now that lock is held on key, it may be present now. 
Could have been
 +          // computed while waiting for lock.
 +          if (zcn != null && zcn.cachedData()) {
 +            return zcn;
 +          }
 +          try {
 +            byte[] data = null;
 +            Stat stat = new Stat();
 +            ZcStat zstat = null;
 +            try {
 +              data = zk.getData(zPath, null, stat);
 +              zstat = new ZcStat(stat);
 +            } catch (KeeperException.NoNodeException e1) {
 +              log.trace("{} zookeeper did not contain {}", cacheId, zPath);
 +              return ZcNode.NON_EXISTENT;
 +            } catch (InterruptedException e) {
 +              throw new ZcInterruptedException(e);
 +            }
 +            if (log.isTraceEnabled()) {
 +              log.trace("{} zookeeper contained {} {}", cacheId, zPath,
 +                  (data == null ? null : new String(data, UTF_8)));
 +            }
 +            return new ZcNode(data, zstat, zcn);
 +          } catch (KeeperException ke) {
 +            throw new ZcException(ke);
 +          }
 +        });
 +
 +        // update this after the compute call completes when the change is 
visible
 +        updateCount.incrementAndGet();
 +        if (status != null) {
 +          copyStats(status, zcNode.getStat());
 +        }
 +        return zcNode.getData();
 +      }
 +    };
 +
 +    return zr.retry();
 +  }
 +
 +  /**
 +   * Helper method to copy stats from the cached stat into userStat
 +   *
 +   * @param userStat user Stat object
 +   * @param cachedStat cached statistic, that is or will be cached
 +   */
 +  protected void copyStats(ZcStat userStat, ZcStat cachedStat) {
 +    Preconditions.checkState(!closed);
 +    if (userStat != null && cachedStat != null) {
 +      userStat.set(cachedStat);
 +    }
 +  }
 +
 +  /**
 +   * Clears this cache.
 +   */
 +  protected void clear() {
 +    Preconditions.checkState(!closed);
 +    nodeCache.clear();
 +    updateCount.incrementAndGet();
 +    log.trace("{} cleared all from cache", cacheId);
 +  }
 +
 +  public void close() {
 +    clear();
 +    closed = true;
 +  }
 +
 +  /**
 +   * Returns a monotonically increasing count of the number of time the cache 
was updated. If the
 +   * count is the same, then it means cache did not change.
 +   */
 +  public long getUpdateCount() {
 +    Preconditions.checkState(!closed);
 +    return updateCount.get();
 +  }
 +
 +  /**
 +   * Checks if a data value (or lack of one) is cached.
 +   *
 +   * @param zPath path of node
 +   * @return true if data value is cached
 +   */
 +  @VisibleForTesting
 +  public boolean dataCached(String zPath) {
 +    ensureWatched(zPath);
 +    var zcn = nodeCache.get(zPath);
 +    return zcn != null && zcn.cachedData();
 +  }
 +
 +  /**
 +   * Checks if children of a node (or lack of them) are cached.
 +   *
 +   * @param zPath path of node
 +   * @return true if children are cached
 +   */
 +  @VisibleForTesting
 +  public boolean childrenCached(String zPath) {
 +    ensureWatched(zPath);
 +    var zcn = nodeCache.get(zPath);
 +    return zcn != null && zcn.cachedChildren();
 +  }
 +
 +  /**
 +   * Removes all paths in the cache match the predicate.
 +   */
 +  public void clear(Predicate<String> pathPredicate) {
 +    Preconditions.checkState(!closed);
 +    Predicate<String> pathPredicateWrapper = path -> {
 +      boolean testResult = pathPredicate.test(path);
 +      if (testResult) {
 +        updateCount.incrementAndGet();
 +        log.trace("{} removing {} from cache", cacheId, path);
 +      }
 +      return testResult;
 +    };
 +    nodeCache.keySet().removeIf(pathPredicateWrapper);
 +  }
 +
 +  /**
 +   * Clears this cache of all information about nodes rooted at the given 
path.
 +   *
 +   * @param zPath path of top node
 +   */
 +  public void clear(String zPath) {
 +    ensureWatched(zPath);
 +    clear(path -> path.startsWith(zPath));
 +  }
 +
 +  /**
 +   * Gets the lock data from the node in the cache at the specified path
 +   */
 +  public Optional<ServiceLockData> getLockData(ServiceLockPath path) {
 +    ensureWatched(path.toString());
 +    List<String> children = ServiceLock.validateAndSort(path, 
getChildren(path.toString()));
 +    if (children == null || children.isEmpty()) {
 +      return Optional.empty();
 +    }
 +    String lockNode = children.get(0);
 +
 +    byte[] lockData = get(path + "/" + lockNode);
 +    if (log.isTraceEnabled()) {
 +      log.trace("{} Data from lockNode {} is {}", cacheId, lockNode, new 
String(lockData, UTF_8));
 +    }
 +    if (lockData == null) {
 +      lockData = new byte[0];
 +    }
 +    return ServiceLockData.parse(lockData);
 +  }
 +
 +}
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java
index 90fc5a4c58,55cf3e1f97..a782b69465
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java
@@@ -667,9 -584,9 +667,9 @@@ public class RatioBasedCompactionPlanne
  
    private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
        Set<CompactableFile> files) {
--    return new CompactionPlanImpl.BuilderImpl(kind, all, all)
 -        .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, 
"small"), files)
 -        .build().getJobs().iterator().next();
++    return new CompactionPlanImpl.BuilderImpl(kind, all)
 +        .addJob((short) all.size(), CompactorGroupId.of("small"), 
files).build().getJobs()
 +        .iterator().next();
    }
  
    // Create a set of files whose sizes would require certain compaction 
ratios to compact
@@@ -817,7 -727,7 +817,7 @@@
  
        @Override
        public Builder createPlanBuilder() {
--        return new CompactionPlanImpl.BuilderImpl(kind, all, candidates);
++        return new CompactionPlanImpl.BuilderImpl(kind, candidates);
        }
      };
    }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index 349dedcc97,0000000000..80e72b1561
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@@ -1,342 -1,0 +1,342 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.compaction;
 +
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.PluginEnvironment;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.logging.ConditionalLogger;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlan;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 +import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 +import org.apache.accumulo.core.spi.compaction.CompactionServices;
 +import org.apache.accumulo.core.util.cache.Caches;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 +import org.apache.accumulo.core.util.time.SteadyTime;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.slf4j.event.Level;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +
 +public class CompactionJobGenerator {
 +  private static final Logger log = 
LoggerFactory.getLogger(CompactionJobGenerator.class);
 +  private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
 +      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
 +  private static final Logger PLANNING_INIT_ERROR_LOG =
 +      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
 +  private static final Logger PLANNING_ERROR_LOG =
 +      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 
3000, Level.ERROR);
 +
 +  private final CompactionServicesConfig servicesConfig;
 +  private final Map<CompactionServiceId,CompactionPlanner> planners = new 
HashMap<>();
 +  private final Cache<TableId,CompactionDispatcher> dispatchers;
 +  private final Set<CompactionServiceId> serviceIds;
 +  private final PluginEnvironment env;
 +  private final Map<FateId,Map<String,String>> allExecutionHints;
 +  private final SteadyTime steadyTime;
 +
 +  public CompactionJobGenerator(PluginEnvironment env,
 +      Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) {
 +    servicesConfig = new CompactionServicesConfig(env.getConfiguration());
 +    serviceIds = 
servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of)
 +        .collect(Collectors.toUnmodifiableSet());
 +
 +    dispatchers = 
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false)
 +        .maximumSize(10).build();
 +    this.env = env;
 +    if (executionHints.isEmpty()) {
 +      this.allExecutionHints = executionHints;
 +    } else {
 +      this.allExecutionHints = new HashMap<>();
 +      // Make the maps that will be passed to plugins unmodifiable. Do this 
once, so it does not
 +      // need to be done for each tablet.
 +      executionHints.forEach((k, v) -> allExecutionHints.put(k,
 +          v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
 +    }
 +
 +    this.steadyTime = steadyTime;
 +  }
 +
 +  public Collection<CompactionJob> generateJobs(TabletMetadata tablet, 
Set<CompactionKind> kinds) {
 +    Collection<CompactionJob> systemJobs = Set.of();
 +
 +    log.trace("Planning for {} {} {}", tablet.getExtent(), kinds, 
this.hashCode());
 +
 +    if (kinds.contains(CompactionKind.SYSTEM)) {
 +      CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, 
Map.of());
 +      systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, 
Map.of());
 +    }
 +
 +    Collection<CompactionJob> userJobs = Set.of();
 +
 +    if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != 
null) {
 +      var hints = 
allExecutionHints.get(tablet.getSelectedFiles().getFateId());
 +      if (hints != null) {
 +        CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, 
hints);
 +        userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, 
hints);
 +      }
 +    }
 +
 +    if (userJobs.isEmpty()) {
 +      return systemJobs;
 +    } else if (systemJobs.isEmpty()) {
 +      return userJobs;
 +    } else {
 +      var all = new ArrayList<CompactionJob>(systemJobs.size() + 
userJobs.size());
 +      all.addAll(systemJobs);
 +      all.addAll(userJobs);
 +      return all;
 +    }
 +  }
 +
 +  private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata 
tablet,
 +      Map<String,String> executionHints) {
 +
 +    CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(),
 +        tableId -> 
CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId));
 +
 +    CompactionDispatcher.DispatchParameters dispatchParams =
 +        new CompactionDispatcher.DispatchParameters() {
 +          @Override
 +          public CompactionServices getCompactionServices() {
 +            return () -> serviceIds;
 +          }
 +
 +          @Override
 +          public ServiceEnvironment getServiceEnv() {
 +            return (ServiceEnvironment) env;
 +          }
 +
 +          @Override
 +          public CompactionKind getCompactionKind() {
 +            return kind;
 +          }
 +
 +          @Override
 +          public Map<String,String> getExecutionHints() {
 +            return executionHints;
 +          }
 +        };
 +
 +    return dispatcher.dispatch(dispatchParams).getService();
 +  }
 +
 +  private Collection<CompactionJob> planCompactions(CompactionServiceId 
serviceId,
 +      CompactionKind kind, TabletMetadata tablet, Map<String,String> 
executionHints) {
 +
 +    if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
 +      UNKNOWN_SERVICE_ERROR_LOG.trace(
 +          "Table {} returned non-existent compaction service {} for 
compaction type {}.  Check"
 +              + " the table compaction dispatcher configuration. No 
compactions will happen"
 +              + " until the configuration is fixed. This log message is 
temporarily suppressed.",
 +          tablet.getExtent().tableId(), serviceId, kind);
 +      return Set.of();
 +    }
 +
 +    CompactionPlanner planner =
 +        planners.computeIfAbsent(serviceId, sid -> 
createPlanner(tablet.getTableId(), serviceId));
 +
 +    // selecting indicator
 +    // selected files
 +
 +    String ratioStr =
 +        
env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey());
 +    if (ratioStr == null) {
 +      ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue();
 +    }
 +
 +    double ratio = Double.parseDouble(ratioStr);
 +
 +    Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream()
 +        .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +        .collect(Collectors.toUnmodifiableSet());
 +    Set<CompactableFile> candidates;
 +
 +    if (kind == CompactionKind.SYSTEM) {
 +      if (tablet.getExternalCompactions().isEmpty() && 
tablet.getSelectedFiles() == null) {
 +        candidates = allFiles;
 +      } else {
 +        var tmpFiles = new HashMap<>(tablet.getFilesMap());
 +        // remove any files that are in active compactions
 +        tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +            .forEach(tmpFiles::remove);
 +        // remove any files that are selected and the user compaction has 
completed
 +        // at least 1 job, otherwise we can keep the files
 +        var selectedFiles = tablet.getSelectedFiles();
 +
 +        if (selectedFiles != null) {
 +          long selectedExpirationDuration =
 +              
ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId())
 +                  
.get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey()));
 +
 +          // If jobs are completed, or selected time has not expired, the 
remove
 +          // from the candidate list otherwise we can cancel the selection
 +          if (selectedFiles.getCompletedJobs() > 0
 +              || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis()
 +                  < selectedExpirationDuration)) {
 +            tmpFiles.keySet().removeAll(selectedFiles.getFiles());
 +          }
 +        }
 +        candidates = tmpFiles.entrySet().stream()
 +            .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +            .collect(Collectors.toUnmodifiableSet());
 +      }
 +    } else if (kind == CompactionKind.USER) {
 +      var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles());
 +      tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +          .forEach(selectedFiles::remove);
 +      candidates = selectedFiles.stream()
 +          .map(file -> new CompactableFileImpl(file, 
tablet.getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    } else {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    if (candidates.isEmpty()) {
 +      // there are not candidate files for compaction, so no reason to call 
the planner
 +      return Set.of();
 +    }
 +
 +    CompactionPlanner.PlanningParameters params = new 
CompactionPlanner.PlanningParameters() {
 +
 +      @Override
 +      public NamespaceId getNamespaceId() throws TableNotFoundException {
 +        return ((ServiceEnvironmentImpl) 
env).getContext().getNamespaceId(tablet.getTableId());
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return tablet.getTableId();
 +      }
 +
 +      @Override
 +      public ServiceEnvironment getServiceEnvironment() {
 +        return (ServiceEnvironment) env;
 +      }
 +
 +      @Override
 +      public CompactionKind getKind() {
 +        return kind;
 +      }
 +
 +      @Override
 +      public double getRatio() {
 +        return ratio;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getAll() {
 +        return allFiles;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getCandidates() {
 +        return candidates;
 +      }
 +
 +      @Override
 +      public Collection<CompactionJob> getRunningCompactions() {
 +        var allFiles2 = tablet.getFilesMap();
 +        return tablet.getExternalCompactions().values().stream().map(ecMeta 
-> {
 +          Collection<CompactableFile> files = ecMeta.getJobFiles().stream()
 +              .map(f -> new CompactableFileImpl(f, 
allFiles2.get(f))).collect(Collectors.toList());
 +          CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
 +              ecMeta.getCompactionGroupId(), files, ecMeta.getKind());
 +          return job;
 +        }).collect(Collectors.toUnmodifiableList());
 +      }
 +
 +      @Override
 +      public Map<String,String> getExecutionHints() {
 +        return executionHints;
 +      }
 +
 +      @Override
 +      public CompactionPlan.Builder createPlanBuilder() {
-         return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
++        return new CompactionPlanImpl.BuilderImpl(kind, candidates);
 +      }
 +    };
 +    return planCompactions(planner, params, serviceId);
 +  }
 +
 +  private CompactionPlanner createPlanner(TableId tableId, 
CompactionServiceId serviceId) {
 +
 +    CompactionPlanner planner;
 +    String plannerClassName = null;
 +    Map<String,String> options = null;
 +    try {
 +      plannerClassName = 
servicesConfig.getPlanners().get(serviceId.canonical());
 +      options = servicesConfig.getOptions().get(serviceId.canonical());
 +      planner = env.instantiate(tableId, plannerClassName, 
CompactionPlanner.class);
 +      CompactionPlannerInitParams initParameters = new 
CompactionPlannerInitParams(serviceId,
 +          servicesConfig.getPlannerPrefix(serviceId.canonical()),
 +          servicesConfig.getOptions().get(serviceId.canonical()), 
(ServiceEnvironment) env);
 +      planner.init(initParameters);
 +    } catch (Exception e) {
 +      PLANNING_INIT_ERROR_LOG.trace(
 +          "Failed to create compaction planner for service:{} tableId:{} 
using class:{} options:{}.  Compaction "
 +              + "service will not start any new compactions until its 
configuration is fixed. This log message is "
 +              + "temporarily suppressed.",
 +          serviceId, tableId, plannerClassName, options, e);
 +      planner = new ProvisionalCompactionPlanner(serviceId);
 +    }
 +    return planner;
 +  }
 +
 +  private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
 +      CompactionPlanner.PlanningParameters params, CompactionServiceId 
serviceId) {
 +    try {
 +      return planner.makePlan(params).getJobs();
 +    } catch (Exception e) {
 +      PLANNING_ERROR_LOG.trace(
 +          "Failed to plan compactions for service:{} kind:{} tableId:{} 
hints:{}.  Compaction service may not start any"
 +              + " new compactions until this issue is resolved. Duplicates of 
this log message are temporarily"
 +              + " suppressed.",
 +          serviceId, params.getKind(), params.getTableId(), 
params.getExecutionHints(), e);
 +      return Set.of();
 +    }
 +  }
 +}

Reply via email to