This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 3f3da5a5da9cf500777d4c4b0d5a1c6556557dd8
Merge: 5bec92248c 9817474333
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Fri Mar 1 01:21:57 2024 +0000

    Merge branch 'main' into elasticity

 .../core/clientImpl/InstanceOperationsImpl.java    |  10 +-
 .../core/clientImpl/NamespaceOperationsImpl.java   |   9 +-
 .../core/clientImpl/TableOperationsImpl.java       |  14 +-
 .../TabletServerBatchReaderIterator.java           |   8 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |   9 +-
 .../accumulo/core/clientImpl/ThriftScanner.java    |   8 +-
 .../accumulo/core/clientImpl/bulk/BulkImport.java  |   9 +-
 .../accumulo/core/fate/zookeeper/ZooReader.java    |   8 +-
 .../spi/compaction/SimpleCompactionDispatcher.java |   5 +-
 .../java/org/apache/accumulo/core/util/Retry.java  | 112 +++++++-------
 .../org/apache/accumulo/core/util/RetryTest.java   | 163 +++++++++++----------
 .../hadoopImpl/mapred/AccumuloRecordReader.java    |  10 +-
 .../hadoopImpl/mapreduce/AccumuloRecordReader.java |  10 +-
 .../server/compaction/RetryableThriftCall.java     |   8 +-
 .../metadata/ConditionalTabletsMutatorImpl.java    |  11 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   7 +-
 .../coordinator/CompactionCoordinator.java         |   8 +-
 .../coordinator/commit/CommitCompaction.java       |  10 +-
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |  11 +-
 .../manager/tableOps/bulkVer2/TabletRefresher.java |  10 +-
 .../manager/tableOps/compact/CompactionDriver.java |  10 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  19 +--
 .../accumulo/tserver/session/SessionManager.java   |   9 +-
 23 files changed, 232 insertions(+), 246 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 6e558b9e3b,8435df2a40..bd708d2b19
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -23,19 -23,10 +23,18 @@@ import static com.google.common.util.co
  import static java.nio.charset.StandardCharsets.UTF_8;
  import static java.util.Objects.requireNonNull;
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.SECONDS;
  import static java.util.stream.Collectors.toSet;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.AVAILABILITY;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
  import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
  import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME;
@@@ -1876,74 -1882,26 +1876,74 @@@ public class TableOperationsImpl extend
        rangeList = new ArrayList<>(ranges);
      }
  
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 -
 +    ClientTabletCache locator = ClientTabletCache.getInstance(context, 
tableId);
      locator.invalidateCache();
  
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
+     Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
+         
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
+         .logInterval(Duration.ofMinutes(3)).createRetry();
  
 -    while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) {
 -      context.requireTableExists(tableId, tableName);
 -      context.requireNotOffline(tableId, tableName);
 -      binnedRanges.clear();
 -      try {
 -        retry.waitForNextAttempt(log,
 -            String.format("locating tablets in table %s(%s) for %d ranges", 
tableName, tableId,
 -                rangeList.size()));
 -      } catch (InterruptedException e) {
 -        throw new IllegalStateException(e);
 +    final ArrayList<KeyExtent> locationLess = new ArrayList<>();
 +    final Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new 
HashMap<>();
 +    final AtomicBoolean foundOnDemandTabletInRange = new AtomicBoolean(false);
 +
 +    BiConsumer<CachedTablet,Range> rangeConsumer = (cachedTablet, range) -> {
 +      // We want tablets that are currently hosted (location present) and
 +      // their tablet availability is HOSTED (not OnDemand)
 +      if (cachedTablet.getAvailability() != TabletAvailability.HOSTED) {
 +        foundOnDemandTabletInRange.set(true);
 +      } else if (cachedTablet.getTserverLocation().isPresent()
 +          && cachedTablet.getAvailability() == TabletAvailability.HOSTED) {
 +        ClientTabletCacheImpl.addRange(binnedRanges, cachedTablet, range);
 +      } else {
 +        locationLess.add(cachedTablet.getExtent());
 +      }
 +    };
 +
 +    try {
 +
 +      List<Range> failed =
 +          locator.findTablets(context, rangeList, rangeConsumer, 
LocationNeed.NOT_REQUIRED);
 +
 +      if (foundOnDemandTabletInRange.get()) {
 +        throw new AccumuloException(
 +            "TableOperations.locate() only works with tablets that have an 
availability of "
 +                + TabletAvailability.HOSTED
 +                + ". Tablets with other availabilities were seen.  table:" + 
tableName
 +                + " table id:" + tableId);
 +      }
 +
 +      while (!failed.isEmpty() || !locationLess.isEmpty()) {
 +
 +        context.requireTableExists(tableId, tableName);
 +        context.requireNotOffline(tableId, tableName);
 +
 +        if (foundOnDemandTabletInRange.get()) {
 +          throw new AccumuloException(
 +              "TableOperations.locate() only works with tablets that have a 
tablet availability of "
 +                  + TabletAvailability.HOSTED
 +                  + ". Tablets with other availabilities were seen.  table:" 
+ tableName
 +                  + " table id:" + tableId);
 +        }
 +
 +        try {
 +          retry.waitForNextAttempt(log,
 +              String.format("locating tablets in table %s(%s) for %d ranges", 
tableName, tableId,
 +                  rangeList.size()));
 +        } catch (InterruptedException e) {
 +          throw new IllegalStateException(e);
 +        }
 +
 +        locationLess.clear();
 +        binnedRanges.clear();
 +        foundOnDemandTabletInRange.set(false);
 +        locator.invalidateCache();
 +        failed = locator.findTablets(context, rangeList, rangeConsumer, 
LocationNeed.NOT_REQUIRED);
        }
 -      locator.invalidateCache();
 +
 +    } catch (InvalidTabletHostingRequestException e) {
 +      throw new AccumuloException("findTablets requested tablet hosting when 
it should not have",
 +          e);
      }
  
      return new LocationsImpl(binnedRanges);
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 324e78fe53,5e71087ce8..b95f256786
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@@ -257,14 -249,10 +255,14 @@@ public class TabletServerBatchReaderIte
  
      int lastFailureSize = Integer.MAX_VALUE;
  
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(10, 
SECONDS).backOffFactor(1.07)
-         .logInterval(1, MINUTES).createFactory().createRetry();
+     Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
+         
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(10)).backOffFactor(1.07)
+         .logInterval(Duration.ofMinutes(1)).createFactory().createRetry();
  
 +    ScanServerData ssd;
 +
 +    long startTime = System.currentTimeMillis();
 +
      while (true) {
  
        binnedRanges.clear();
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 46d5be5e89,87726fd639..428a006575
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@@ -306,61 -259,10 +304,61 @@@ public class ThriftScanner 
        if (useScanServer) {
          scanAttempts = new ScanServerAttemptsImpl();
        }
 +
 +      this.tabletsScanned = 0;
 +    }
 +
 +    long startTimeNanos = 0;
 +    long getNextScanAddressTimeNanos = 0;
 +
 +    public void incrementTabletsScanned(KeyExtent extent) {
 +      if (!extent.equals(prevExtent)) {
 +        tabletsScanned++;
 +        prevExtent = extent;
 +      }
 +    }
 +  }
 +
 +  static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration 
maxWaitTime,
 +      String description, Duration timeoutLeft, ClientContext context, 
TableId tableId,
 +      Logger log) {
 +
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    long startTime = System.nanoTime();
 +    Optional<T> optional = condition.get();
 +    while (optional.isEmpty()) {
 +      log.trace("For tableId {} scan server selector is waiting for '{}'", 
tableId, description);
 +
 +      var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime);
 +
 +      if (elapsedTime.compareTo(timeoutLeft) > 0) {
 +        throw new TimedOutException("While waiting for '" + description
 +            + "' in order to select a scan server, the scan timed out. ");
 +      }
 +
 +      if (elapsedTime.compareTo(maxWaitTime) > 0) {
 +        return Optional.empty();
 +      }
 +
 +      context.requireNotDeleted(tableId);
 +
 +      try {
 +        retry.waitForNextAttempt(log, String.format(
 +            "For tableId %s scan server selector is waiting for '%s'", 
tableId, description));
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +
 +      optional = condition.get();
      }
 +
 +    return optional;
    }
  
 -  public static class ScanTimedOutException extends IOException {
 +  public static class ScanTimedOutException extends TimedOutException {
  
      private static final long serialVersionUID = 1L;
  
diff --cc 
hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 53a65676e2,09bb4539a6..463d1355ec
--- 
a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ 
b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@@ -26,6 -23,6 +23,7 @@@ import static org.apache.accumulo.core.
  
  import java.io.IOException;
  import java.net.InetAddress;
++import java.time.Duration;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
@@@ -354,51 -346,18 +352,51 @@@ public abstract class AccumuloRecordRea
              // tablets... so clear it
              tl.invalidateCache();
  
 -            while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
 -              context.requireNotDeleted(tableId);
 -              context.requireNotOffline(tableId, tableName);
 -              binnedRanges.clear();
 -              log.warn("Unable to locate bins for specified ranges. 
Retrying.");
 -              // sleep randomly between 100 and 200 ms
 -              sleepUninterruptibly(100 + RANDOM.get().nextInt(100), 
TimeUnit.MILLISECONDS);
 -              tl.invalidateCache();
 +            if (InputConfigurator.getConsistencyLevel(callingClass, job)
 +                == ConsistencyLevel.IMMEDIATE) {
 +              while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
 +                context.requireNotDeleted(tableId);
 +                context.requireNotOffline(tableId, tableName);
 +                binnedRanges.clear();
 +                log.warn("Unable to locate bins for specified ranges. 
Retrying.");
 +                // sleep randomly between 100 and 200 ms
 +                sleepUninterruptibly(100 + RANDOM.get().nextInt(100), 
TimeUnit.MILLISECONDS);
 +                tl.invalidateCache();
 +              }
 +            } else {
 +              Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new 
HashMap<>();
 +              unhostedRanges.put("", new HashMap<>());
 +              BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
 +                unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> 
new ArrayList<>())
 +                    .add(r);
 +              };
 +              List<Range> failures =
 +                  tl.findTablets(context, ranges, consumer, 
LocationNeed.NOT_REQUIRED);
 +
-               Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-                   .incrementBy(100, MILLISECONDS).maxWait(2, 
SECONDS).backOffFactor(1.5)
-                   .logInterval(3, MINUTES).createRetry();
++              Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++                  
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2))
++                  
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +              while (!failures.isEmpty()) {
 +
 +                context.requireNotDeleted(tableId);
 +
 +                try {
 +                  retry.waitForNextAttempt(log,
 +                      String.format("locating tablets in table %s(%s) for %d 
ranges", tableName,
 +                          tableId, ranges.size()));
 +                } catch (InterruptedException e) {
 +                  throw new RuntimeException(e);
 +                }
 +                unhostedRanges.get("").clear();
 +                tl.invalidateCache();
 +                failures = tl.findTablets(context, ranges, consumer, 
LocationNeed.NOT_REQUIRED);
 +              }
 +              binnedRanges = unhostedRanges;
              }
            }
 -        } catch (TableOfflineException | TableNotFoundException | 
AccumuloException
 -            | AccumuloSecurityException e) {
 +        } catch (InvalidTabletHostingRequestException | TableOfflineException
 +            | TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
            throw new IOException(e);
          }
  
diff --cc 
hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index b088d8530f,3b60ca798a..50c7fecc9c
--- 
a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ 
b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@@ -26,6 -23,6 +23,7 @@@ import static org.apache.accumulo.core.
  
  import java.io.IOException;
  import java.net.InetAddress;
++import java.time.Duration;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
@@@ -387,52 -379,18 +385,52 @@@ public abstract class AccumuloRecordRea
              // tables tablets... so clear it
              tl.invalidateCache();
  
 -            while (!tl.binRanges(clientContext, ranges, 
binnedRanges).isEmpty()) {
 -              clientContext.requireNotDeleted(tableId);
 -              clientContext.requireNotOffline(tableId, tableName);
 -              binnedRanges.clear();
 -              log.warn("Unable to locate bins for specified ranges. 
Retrying.");
 -              // sleep randomly between 100 and 200 ms
 -              sleepUninterruptibly(100 + RANDOM.get().nextInt(100), 
TimeUnit.MILLISECONDS);
 -              tl.invalidateCache();
 +            if (InputConfigurator.getConsistencyLevel(callingClass, 
context.getConfiguration())
 +                == ConsistencyLevel.IMMEDIATE) {
 +              while (!tl.binRanges(clientContext, ranges, 
binnedRanges).isEmpty()) {
 +                clientContext.requireNotDeleted(tableId);
 +                clientContext.requireNotOffline(tableId, tableName);
 +                binnedRanges.clear();
 +                log.warn("Unable to locate bins for specified ranges. 
Retrying.");
 +                // sleep randomly between 100 and 200 ms
 +                sleepUninterruptibly(100 + RANDOM.get().nextInt(100), 
TimeUnit.MILLISECONDS);
 +                tl.invalidateCache();
 +              }
 +            } else {
 +              Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new 
HashMap<>();
 +              unhostedRanges.put("", new HashMap<>());
 +              BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
 +                unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> 
new ArrayList<>())
 +                    .add(r);
 +              };
 +              List<Range> failures =
 +                  tl.findTablets(clientContext, ranges, consumer, 
LocationNeed.NOT_REQUIRED);
 +
-               Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-                   .incrementBy(100, MILLISECONDS).maxWait(2, 
SECONDS).backOffFactor(1.5)
-                   .logInterval(3, MINUTES).createRetry();
++              Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++                  
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2))
++                  
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +              while (!failures.isEmpty()) {
 +
 +                clientContext.requireNotDeleted(tableId);
 +
 +                try {
 +                  retry.waitForNextAttempt(log,
 +                      String.format("locating tablets in table %s(%s) for %d 
ranges", tableName,
 +                          tableId, ranges.size()));
 +                } catch (InterruptedException e) {
 +                  throw new RuntimeException(e);
 +                }
 +                unhostedRanges.get("").clear();
 +                tl.invalidateCache();
 +                failures =
 +                    tl.findTablets(clientContext, ranges, consumer, 
LocationNeed.NOT_REQUIRED);
 +              }
 +              binnedRanges = unhostedRanges;
              }
            }
 -        } catch (TableOfflineException | TableNotFoundException | 
AccumuloException
 -            | AccumuloSecurityException e) {
 +        } catch (InvalidTabletHostingRequestException | TableOfflineException
 +            | TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
            throw new IOException(e);
          }
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 9e7e903a00,0000000000..4401654313
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@@ -1,298 -1,0 +1,295 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.metadata;
 +
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
- import static java.util.concurrent.TimeUnit.SECONDS;
- 
++import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.ConditionalMutation;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.base.Suppliers;
 +import com.google.common.collect.Maps;
 +import com.google.common.collect.Sets;
 +
 +public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMutator {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(ConditionalTabletsMutatorImpl.class);
 +
 +  private final ServerContext context;
 +  private Ample.DataLevel dataLevel = null;
 +
 +  private List<ConditionalMutation> mutations = new ArrayList<>();
 +
 +  private Map<Text,KeyExtent> extents = new HashMap<>();
 +
 +  private boolean active = true;
 +
 +  Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>();
 +
 +  public ConditionalTabletsMutatorImpl(ServerContext context) {
 +    this.context = context;
 +  }
 +
 +  @Override
 +  public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
 +    Preconditions.checkState(active);
 +
 +    var dataLevel = Ample.DataLevel.of(extent.tableId());
 +
 +    if (this.dataLevel == null) {
 +      this.dataLevel = dataLevel;
 +    } else if (!this.dataLevel.equals(dataLevel)) {
 +      throw new IllegalArgumentException(
 +          "Can not mix data levels " + this.dataLevel + " " + dataLevel);
 +    }
 +
 +    Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) 
== null,
 +        "Duplicate extents not handled %s", extent);
 +    return new ConditionalTabletMutatorImpl(this, context, extent, 
mutations::add,
 +        rejectedHandlers::put);
 +  }
 +
 +  protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel)
 +      throws TableNotFoundException {
 +    if (dataLevel == Ample.DataLevel.ROOT) {
 +      return new RootConditionalWriter(context);
 +    } else {
 +      return context.createConditionalWriter(dataLevel.metaTable());
 +    }
 +  }
 +
 +  protected Map<KeyExtent,TabletMetadata> readTablets(List<KeyExtent> 
extents) {
 +    Map<KeyExtent,TabletMetadata> failedTablets = new HashMap<>();
 +
 +    try (var tabletsMeta = 
context.getAmple().readTablets().forTablets(extents, Optional.empty())
 +        .saveKeyValues().build()) {
 +      tabletsMeta
 +          .forEach(tabletMetadata -> 
failedTablets.put(tabletMetadata.getExtent(), tabletMetadata));
 +    }
 +
 +    return failedTablets;
 +  }
 +
 +  private Map<KeyExtent,TabletMetadata>
 +      readFailedTablets(Map<KeyExtent,ConditionalWriter.Result> results) {
 +
 +    var extents = results.entrySet().stream().filter(e -> {
 +      try {
 +        return e.getValue().getStatus() != ConditionalWriter.Status.ACCEPTED;
 +      } catch (AccumuloException | AccumuloSecurityException ex) {
 +        throw new RuntimeException(ex);
 +      }
 +    }).map(Map.Entry::getKey).collect(Collectors.toList());
 +
 +    if (extents.isEmpty()) {
 +      return Map.of();
 +    }
 +
 +    return readTablets(extents);
 +  }
 +
 +  private void partitionResults(Iterator<ConditionalWriter.Result> results,
 +      List<ConditionalWriter.Result> resultsList, 
List<ConditionalWriter.Result> unknownResults) {
 +    while (results.hasNext()) {
 +      var result = results.next();
 +
 +      try {
 +        if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
 +          unknownResults.add(result);
 +        } else {
 +          resultsList.add(result);
 +        }
 +      } catch (AccumuloException | AccumuloSecurityException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +
 +  private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter 
conditionalWriter) {
 +    var results = conditionalWriter.write(mutations.iterator());
 +
 +    List<ConditionalWriter.Result> resultsList = new ArrayList<>();
 +    List<ConditionalWriter.Result> unknownResults = new ArrayList<>();
 +    partitionResults(results, resultsList, unknownResults);
 +
 +    Retry retry = null;
 +
 +    while (!unknownResults.isEmpty()) {
 +      try {
 +        if (retry == null) {
-           retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-               .incrementBy(100, MILLISECONDS).maxWait(2, 
SECONDS).backOffFactor(1.5)
-               .logInterval(3, MINUTES).createRetry();
++          retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++              
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
++              .logInterval(Duration.ofMinutes(3)).createRetry();
 +        }
 +        retry.waitForNextAttempt(log, "handle conditional mutations with 
unknown status");
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +
 +      results = conditionalWriter
 +          
.write(unknownResults.stream().map(ConditionalWriter.Result::getMutation).iterator());
 +
 +      // create a new array instead of clearing in case the above has not 
consumed everything
 +      unknownResults = new ArrayList<>();
 +
 +      partitionResults(results, resultsList, unknownResults);
 +    }
 +
 +    return resultsList.iterator();
 +  }
 +
 +  private Ample.ConditionalResult.Status mapStatus(KeyExtent extent,
 +      ConditionalWriter.Result result) {
 +
 +    ConditionalWriter.Status status = null;
 +    try {
 +      status = result.getStatus();
 +    } catch (AccumuloException | AccumuloSecurityException e) {
 +      throw new IllegalStateException(e);
 +    }
 +
 +    switch (status) {
 +      case REJECTED:
 +        return Ample.ConditionalResult.Status.REJECTED;
 +      case ACCEPTED:
 +        return Ample.ConditionalResult.Status.ACCEPTED;
 +      default:
 +        throw new IllegalStateException(
 +            "Unexpected conditional mutation status : " + extent + " " + 
status);
 +    }
 +  }
 +
 +  @Override
 +  public Map<KeyExtent,Ample.ConditionalResult> process() {
 +    Preconditions.checkState(active);
 +    if (dataLevel != null) {
 +      try (ConditionalWriter conditionalWriter = 
createConditionalWriter(dataLevel)) {
 +        var results = writeMutations(conditionalWriter);
 +
 +        var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
 +
 +        while (results.hasNext()) {
 +          var result = results.next();
 +          var row = new Text(result.getMutation().getRow());
 +          resultsMap.put(extents.get(row), result);
 +        }
 +
 +        var extentsSet = Set.copyOf(extents.values());
 +        ensureAllExtentsSeen(resultsMap, extentsSet);
 +
 +        // only fetch the metadata for failures when requested and when it is 
requested fetch all
 +        // of the failed extents at once to avoid fetching them one by one.
 +        var failedMetadata = Suppliers.memoize(() -> 
readFailedTablets(resultsMap));
 +
 +        return Maps.transformEntries(resultsMap, (extent, result) -> new 
Ample.ConditionalResult() {
 +
 +          private Ample.ConditionalResult.Status _getStatus() {
 +            return mapStatus(extent, result);
 +          }
 +
 +          @Override
 +          public Status getStatus() {
 +            var status = _getStatus();
 +            if (status == Status.REJECTED && 
rejectedHandlers.containsKey(extent)) {
 +              var tabletMetadata = readMetadata();
 +              var handler = rejectedHandlers.get(extent);
 +              if (tabletMetadata == null && 
handler.callWhenTabletDoesNotExists()
 +                  && handler.test(null)) {
 +                return Status.ACCEPTED;
 +              }
 +              if (tabletMetadata != null && handler.test(tabletMetadata)) {
 +                return Status.ACCEPTED;
 +              }
 +            }
 +
 +            return status;
 +          }
 +
 +          @Override
 +          public KeyExtent getExtent() {
 +            return extent;
 +          }
 +
 +          @Override
 +          public TabletMetadata readMetadata() {
 +            Preconditions.checkState(_getStatus() != Status.ACCEPTED,
 +                "Can not read metadata for mutations with a status of " + 
Status.ACCEPTED);
 +            return failedMetadata.get().get(getExtent());
 +          }
 +        });
 +      } catch (TableNotFoundException e) {
 +        throw new RuntimeException(e);
 +      } finally {
 +        // render inoperable because reuse is not tested
 +        extents.clear();
 +        mutations.clear();
 +        active = false;
 +      }
 +    } else {
 +      // render inoperable because reuse is not tested
 +      extents.clear();
 +      mutations.clear();
 +      active = false;
 +      return Map.of();
 +    }
 +  }
 +
 +  private void 
ensureAllExtentsSeen(HashMap<KeyExtent,ConditionalWriter.Result> resultsMap,
 +      Set<KeyExtent> extentsSet) {
 +    if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
 +      // ELASTICITY_TODO this check can trigger if someone forgets to submit, 
could check for
 +      // that
 +
 +      Sets.difference(resultsMap.keySet(), extentsSet)
 +          .forEach(extent -> log.error("Unexpected extent seen in in result 
{}", extent));
 +
 +      Sets.difference(extentsSet, resultsMap.keySet())
 +          .forEach(extent -> log.error("Expected extent not seen in result 
{}", extent));
 +
 +      resultsMap.forEach((keyExtent, result) -> {
 +        log.error("result seen {} {}", keyExtent, new 
Text(result.getMutation().getRow()));
 +      });
 +
 +      throw new AssertionError("Not all extents were seen, this is 
unexpected");
 +    }
 +  }
 +
 +  @Override
 +  public void close() {}
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 2c8dae7671,0000000000..ebd5d8f764
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1016 -1,0 +1,1016 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
++import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.Manager;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
 +import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational 
cache of what external
 +   * compactions may be running. Its possible it may contain external 
compactions that are not
 +   * actually running. It may not contain compactions that are actually 
running. The metadata table
 +   * is the most authoritative source of what external compactions are 
currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  /* Map of group name to last time compactor called to get a compaction job 
*/
 +  // ELASTICITY_TODO need to clean out groups that are no longer configured..
 +  private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final SecurityOperation security;
 +  private final CompactionJobQueues jobQueues;
 +  private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
 +  // Exposed for tests
 +  protected volatile Boolean shutdown = false;
 +
 +  private final ScheduledThreadPoolExecutor schedExecutor;
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> tabletDirCache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private final QueueMetrics queueMetrics;
 +
 +  public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
 +      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
 +    this.ctx = ctx;
 +    this.schedExecutor = this.ctx.getScheduledExecutor();
 +    this.security = security;
 +
 +    this.jobQueues = new CompactionJobQueues(
 +        
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    this.fateInstances = fateInstances;
 +
 +    completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<FateId,CompactionConfig> loader =
 +        fateId -> CompactionConfigStorage.getConfig(ctx, fateId);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config 
never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a 
time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. 
Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    tabletDirCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +        .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, 
schedExecutor);
 +    // At this point the manager does not have its lock so no actions should 
be taken yet
 +  }
 +
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createThread("CompactionCoordinator Thread", 
this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown = true;
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, 
TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  @Override
 +  public void run() {
 +
 +    startCompactionCleaner(schedExecutor);
 +    startRunningCleaner(schedExecutor);
 +
 +    // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then 
resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING 
collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of 
running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage("Coordinator restarted, compaction found in 
progress");
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +
 +    // ELASTICITY_TODO the main function of the following loop was getting 
group summaries from
 +    // tservers. Its no longer doing that. May be best to remove the loop and 
make the remaining
 +    // task a scheduled one.
 +
 +    LOG.info("Starting loop to check for compactors not checking in");
 +    while (!shutdown) {
 +      long start = System.currentTimeMillis();
 +
 +      long now = System.currentTimeMillis();
 +      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
 +        if ((now - v) > getMissingCompactorWarningTime()) {
 +          // ELASTICITY_TODO may want to consider of the group has any jobs 
queued OR if the group
 +          // still exist in configuration
 +          LOG.warn("No compactors have checked in with coordinator for group 
{} in {}ms", k,
 +              getMissingCompactorWarningTime());
 +        }
 +      });
 +
 +      long checkInterval = getTServerCheckInterval();
 +      long duration = (System.currentTimeMillis() - start);
 +      if (checkInterval - duration > 0) {
 +        LOG.debug("Waiting {}ms for next group check", (checkInterval - 
duration));
 +        UtilWaitThread.sleep(checkInterval - duration);
 +      }
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
 +    return 
this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME)
 * 3;
 +  }
 +
 +  protected long getTServerCheckInterval() {
 +    return this.ctx.getConfiguration()
 +        
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
 +  public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call 
this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    CompactorGroupId groupId = CompactorGroupId.of(groupName);
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", 
groupId, compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId);
 +
 +    while (metaJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = 
getCompactionConfig(metaJob);
 +
 +      // this method may reread the metadata, do not use the metadata in 
metaJob for anything after
 +      // this method
 +      CompactionMetadata ecm = null;
 +
 +      var kind = metaJob.getJob().getKind();
 +
 +      // Only reserve user compactions when the config is present. When 
compactions are canceled the
 +      // config is deleted.
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(metaJob, compactorAddress,
 +            ExternalCompactionId.from(externalCompactionId));
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, metaJob, 
compactionConfig);
 +        // It is possible that by the time this added that the the compactor 
that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, groupName));
 +        LOG.debug("Returning external job {} to {} with {} files", 
result.externalCompactionId,
 +            compactorAddress, ecm.getJobFiles().size());
 +        break;
 +      } else {
 +        LOG.debug(
 +            "Unable to reserve compaction job for {}, pulling another off the 
queue for group {}",
 +            metaJob.getTabletMetadata().getExtent(), groupName);
 +        metaJob = jobQueues.poll(CompactorGroupId.of(groupName));
 +      }
 +    }
 +
 +    if (metaJob == null) {
 +      LOG.debug("No jobs found in group {} ", groupName);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", groupName,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
 +    return result;
 +
 +  }
 +
 +  // ELASTICITY_TODO unit test this code
 +  private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob 
job,
 +      Set<StoredTabletFile> jobFiles) {
 +
 +    if (tablet == null) {
 +      // the tablet no longer exist
 +      return false;
 +    }
 +
 +    if (tablet.getOperationId() != null) {
 +      return false;
 +    }
 +
 +    if (!tablet.getFiles().containsAll(jobFiles)) {
 +      return false;
 +    }
 +
 +    var currentlyCompactingFiles = 
tablet.getExternalCompactions().values().stream()
 +        .flatMap(ecm -> 
ecm.getJobFiles().stream()).collect(Collectors.toSet());
 +
 +    if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) {
 +      return false;
 +    }
 +
 +    switch (job.getKind()) {
 +      case SYSTEM:
 +        var userRequestedCompactions = 
tablet.getUserCompactionsRequested().size();
 +        if (userRequestedCompactions > 0) {
 +          LOG.debug(
 +              "Unable to reserve {} for system compaction, tablet has {} 
pending requested user compactions",
 +              tablet.getExtent(), userRequestedCompactions);
 +          return false;
 +        } else if (tablet.getSelectedFiles() != null
 +            && !Collections.disjoint(jobFiles, 
tablet.getSelectedFiles().getFiles())) {
 +          return false;
 +        }
 +        break;
 +      case USER:
 +      case SELECTOR:
 +        if (tablet.getSelectedFiles() == null
 +            || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) {
 +          return false;
 +        }
 +        break;
 +      default:
 +        throw new UnsupportedOperationException("Not currently handling " + 
job.getKind());
 +    }
 +
 +    return true;
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (tabletDirCache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        tabletDirCache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  protected CompactionMetadata createExternalCompactionMetadata(CompactionJob 
job,
 +      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
 +      ExternalCompactionId externalCompactionId) {
 +    boolean propDels;
 +
 +    FateId fateId = null;
 +
 +    switch (job.getKind()) {
 +      case SYSTEM: {
 +        boolean compactingAll = tablet.getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +      }
 +        break;
 +      case SELECTOR:
 +      case USER: {
 +        boolean compactingAll = 
tablet.getSelectedFiles().initiallySelectedAll()
 +            && tablet.getSelectedFiles().getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +        fateId = tablet.getSelectedFiles().getFateId();
 +      }
 +        break;
 +      default:
 +        throw new IllegalArgumentException();
 +    }
 +
 +    Consumer<String> directoryCreator = dir -> 
checkTabletDir(tablet.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = 
TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        tablet, directoryCreator, externalCompactionId);
 +
 +    return new CompactionMetadata(jobFiles, newFile, compactorAddress, 
job.getKind(),
 +        job.getPriority(), job.getGroup(), propDels, fateId);
 +
 +  }
 +
 +  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
 +        || metaJob.getJob().getKind() == CompactionKind.USER);
 +
 +    var tabletMetadata = metaJob.getTabletMetadata();
 +
 +    var jobFiles = 
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +
-     Retry retry =
-         Retry.builder().maxRetries(5).retryAfter(100, 
MILLISECONDS).incrementBy(100, MILLISECONDS)
-             .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, 
MINUTES).createRetry();
++    Retry retry = 
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (retry.canRetry()) {
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var extent = metaJob.getTabletMetadata().getExtent();
 +
 +        if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), 
jobFiles)) {
 +          return null;
 +        }
 +
 +        var ecm = createExternalCompactionMetadata(metaJob.getJob(), 
jobFiles, tabletMetadata,
 +            compactorAddress, externalCompactionId);
 +
 +        // any data that is read from the tablet to make a decision about if 
it can compact or not
 +        // must be included in the requireSame call
 +        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 +
 +        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +        tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
 +
 +        var result = tabletsMutator.process().get(extent);
 +
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          return ecm;
 +        } else {
 +          tabletMetadata = result.readMetadata();
 +        }
 +      }
 +
 +      retry.useRetry();
 +      try {
 +        retry.waitForNextAttempt(LOG,
 +            "Reserved compaction for " + 
metaJob.getTabletMetadata().getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return null;
 +  }
 +
 +  protected TExternalCompactionJob createThriftJob(String 
externalCompactionId,
 +      CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
 +    Set<CompactableFile> selectedFiles;
 +    if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
 +      selectedFiles = Set.of();
 +    } else {
 +      selectedFiles = 
metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream()
 +          .map(file -> new CompactableFileImpl(file,
 +              metaJob.getTabletMetadata().getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    }
 +
 +    Map<String,String> overrides = 
CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
 +        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), 
selectedFiles);
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        
.toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
 +      var dfv = 
metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
 +      return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), 
dfv.getNumEntries(),
 +          dfv.getTime());
 +    }).collect(Collectors.toList());
 +
 +    FateInstanceType type = 
FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId());
 +    FateId fateId = FateId.from(type, 0);
 +    if (metaJob.getJob().getKind() == CompactionKind.USER) {
 +      fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId,
 +        metaJob.getTabletMetadata().getExtent().toThrift(), files, 
iteratorSettings,
 +        ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateId.toThrift(), 
overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, 
CompactionJobQueues::getQueuedJobCount)
 +        .description("Number of queued major compactions").register(registry);
 +    Gauge.builder(METRICS_MAJC_RUNNING, this, 
CompactionCoordinator::getNumRunningCompactions)
 +        .description("Number of running major 
compactions").register(registry);
 +
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
 +    jobQueues.add(tabletMetadata, jobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  private Optional<CompactionConfig> 
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
 +    if (metaJob.getJob().getKind() == CompactionKind.USER
 +        && metaJob.getTabletMetadata().getSelectedFiles() != null) {
 +      var cconf =
 +          
compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This 
method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can 
commit. Its possible that
 +   * things changed while the compaction was running and it can no longer 
commit.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets 
files or location
 +   * changed since reading the tablets metadata, then conditional mutation 
will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. 
When committing a
 +   * compaction the compacted files are removed and scan entries are added to 
the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between 
updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has 
a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if 
it has a location. This
 +   * will cause the tablet to start using the newly compacted files for 
future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using 
them.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The 
user compaction fate
 +   * operation will see the compaction was committed after this code updates 
the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not 
be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as 
part of the fate
 +   * operation so that it's known to be done before the fate operation 
returns. Since the fate
 +   * operation will do it, there is no need to do it here for user 
compactions.
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    // maybe fate has not started yet
 +    var localFates = fateInstances.get();
 +    while (localFates == null) {
 +      UtilWaitThread.sleep(100);
 +      if (shutdown) {
 +        return;
 +      }
 +      localFates = fateInstances.get();
 +    }
 +
 +    var extent = KeyExtent.fromThrift(textent);
 +    var localFate = 
localFates.get(FateInstanceType.fromTableId(extent.tableId()));
 +
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 +
 +    if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
 +    var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, 
extent, ecm, stats));
 +
 +    // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves 
two problem. First it
 +    // defends against starting multiple fate txs for the same thing. This 
will help the split code
 +    // also. Second the tag can be used by the dead compaction detector to 
ignore committing
 +    // compactions. The imple coould hash the key to produce the fate tx id.
 +    var txid = localFate.startTransaction();
 +    localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true,
 +        "Commit compaction " + ecid);
 +
 +    // ELASTICITY_TODO need to remove this wait. It is here because when the 
dead compaction
 +    // detector ask a compactor what its currently running it expects that 
cover commit. To remove
 +    // this wait would need another way for the dead compaction detector to 
know about committing
 +    // compactions. Could add a tag to the fate tx with the ecid and have 
dead compaction detector
 +    // scan these tags. This wait makes the code running in fate not be fault 
tolerant because in
 +    // the
 +    // case of faults the dead compaction detector may remove the compaction 
entry.
 +    localFate.waitForCompletion(txid);
 +
 +    // It's possible that RUNNING might not have an entry for this ecid in 
the case
 +    // of a coordinator restart when the Coordinator can't find the TServer 
for the
 +    // corresponding external compaction.
 +    recordCompletion(ecid);
 +    // ELASTICITY_TODO should above call move into fate code?
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
 +      TKeyExtent extent) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
 +    LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, 
fromThriftExtent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((ecid, extent) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
 +              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
 +
 +                @Override
 +                public boolean callWhenTabletDoesNotExists() {
 +                  return true;
 +                }
 +
 +                @Override
 +                public boolean test(TabletMetadata tabletMetadata) {
 +                  return tabletMetadata == null
 +                      || 
!tabletMetadata.getExternalCompactions().containsKey(ecid);
 +                }
 +
 +              });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector 
runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            var ecid =
 +                compactions.entrySet().stream().filter(entry -> 
entry.getValue().equals(extent))
 +                    .findFirst().map(Map.Entry::getKey).orElse(null);
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, 
ecid);
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a 
compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. 
This block is
 +          // entered when the conditional mutator above successfully deletes 
an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet 
directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          compactions.entrySet().stream().filter(e -> 
e.getValue().compareTo(extent) == 0)
 +              .map(Entry::getKey).forEach(ecidsForTablet::add);
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = 
ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + 
tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = fs.listStatus(new Path(volPath), 
(path) -> {
 +                      return path.getName().endsWith(fileSuffix);
 +                    });
 +                    if (files.length > 0) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", 
file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", 
file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for 
tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be 
due to a merge or
 +              // split operation. Use the utility to find tmp files at the 
table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.forEach((k, v) -> recordCompletion(k));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long 
timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", 
externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +    }
 +  }
 +
 +  private void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    try (TabletsMetadata tabletsMetadata =
 +        this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
 +            .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) 
{
 +      return tabletsMetadata.stream().flatMap(tm -> 
tm.getExternalCompactions().keySet().stream())
 +          .collect(Collectors.toSet());
 +    }
 +  }
 +
 +  /**
 +   * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
 +   * method periodically cleans those up.
 +   */
 +  public void cleanUpRunning() {
 +
 +    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(RUNNING_CACHE.keySet());
 +
 +    // grab the ids that are listed as running in the metadata table. It 
important that this is done
 +    // after getting the snapshot.
 +    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
 +
 +    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getRunningCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getCompletedCompactions(TInfo tinfo, 
TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to 
call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
 +      throws TException {
 +    var runningCompaction = 
RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), 
null,
 +          TableOperation.COMPACT_CANCEL, 
TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
 +  }
 +
 +  /* Method exists to be called from test */
 +  public CompactionJobQueues getJobQueues() {
 +    return jobQueues;
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, 
externalCompactionId);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race 
condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpCompactors() {
 +    final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 +
 +    var zoorw = this.ctx.getZooReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(compactorQueuesPath);
 +
 +      for (String group : groups) {
 +        String qpath = compactorQueuesPath + "/" + group;
 +
 +        var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +        }
 +
 +        for (String compactor : compactors) {
 +          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
 +          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
 +          if (lockNodes.isEmpty()) {
 +            deleteEmpty(zoorw, cpath);
 +          }
 +        }
 +      }
 +
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 477a040330,0000000000..2ff3a30386
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@@ -1,280 -1,0 +1,278 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator.commit;
 +
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
- import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
++import java.time.Duration;
 +import java.util.HashSet;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.core.metadata.AbstractTabletFile;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.manager.Manager;
 +import org.apache.accumulo.manager.tableOps.ManagerRepo;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +public class CommitCompaction extends ManagerRepo {
 +  private static final long serialVersionUID = 1L;
 +  private static final Logger LOG = 
LoggerFactory.getLogger(CommitCompaction.class);
 +  private final CompactionCommitData commitData;
 +  private final String newDatafile;
 +
 +  public CommitCompaction(CompactionCommitData commitData, String 
newDatafile) {
 +    this.commitData = commitData;
 +    this.newDatafile = newDatafile;
 +  }
 +
 +  @Override
 +  public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
 +    var ecid = ExternalCompactionId.of(commitData.ecid);
 +    var newFile = Optional.ofNullable(newDatafile).map(f -> 
ReferencedTabletFile.of(new Path(f)));
 +
 +    // ELASTICITIY_TODO is it possible to test this code running a 2nd time, 
simulating a failure
 +    // and rerun? Maybe fate could have a testing mode where it calls 
operations multiple times?
 +
 +    // It is possible that when this runs that the compaction was previously 
committed and then the
 +    // process died and now its running again. In this case commit should do 
nothing, but its
 +    // important to still carry on with the rest of the steps after commit. 
This code ignores a that
 +    // fact that a commit may not have happened in the current call and 
continues for this reason.
 +    TabletMetadata tabletMetadata = commitCompaction(manager.getContext(), 
ecid, newFile);
 +
 +    String loc = null;
 +    if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
 +      loc = tabletMetadata.getLocation().getHostPortSession();
 +    }
 +
 +    // This will causes the tablet to be reexamined to see if it needs any 
more compactions.
 +    var extent = KeyExtent.fromThrift(commitData.textent);
 +    manager.getEventCoordinator().event(extent, "Compaction completed %s", 
extent);
 +
 +    return new PutGcCandidates(commitData, loc);
 +  }
 +
 +  KeyExtent getExtent() {
 +    return KeyExtent.fromThrift(commitData.textent);
 +  }
 +
 +  private TabletMetadata commitCompaction(ServerContext ctx, 
ExternalCompactionId ecid,
 +      Optional<ReferencedTabletFile> newDatafile) {
 +
 +    var tablet =
 +        ctx.getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION, 
FILES, COMPACTED, OPID);
 +
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(10, 
SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (canCommitCompaction(ecid, tablet)) {
 +      CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
 +
 +      // the compacted files should not exists in the tablet already
 +      var tablet2 = tablet;
 +      newDatafile.ifPresent(
 +          newFile -> 
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
 +              "File already exists in tablet %s %s", newFile, 
tablet2.getFiles()));
 +
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var tabletMutator = 
tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
 +            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
 +
 +        if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
 +          tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
 +        }
 +
 +        // make the needed updates to the tablet
 +        updateTabletForCompaction(commitData.stats, ecid, tablet, 
newDatafile, ecm, tabletMutator);
 +
 +        tabletMutator
 +            .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
 +
 +        // TODO expensive logging
 +        LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
 +            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
 +                .collect(Collectors.toList()));
 +
 +        var result = tabletsMutator.process().get(getExtent());
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          break;
 +        } else {
 +          // compaction failed to commit, maybe something changed on the 
tablet so lets reread the
 +          // metadata and try again
 +          tablet = result.readMetadata();
 +        }
 +
 +        retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for 
tablet " + getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return tablet;
 +  }
 +
 +  private void updateTabletForCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
 +      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, 
CompactionMetadata ecm,
 +      Ample.ConditionalTabletMutator tabletMutator) {
 +    // ELASTICITY_TODO improve logging adapt to use existing tablet files 
logging
 +    if (ecm.getKind() == CompactionKind.USER) {
 +      if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
 +        // all files selected for the user compactions are finished, so the 
tablet is finish and
 +        // its compaction id needs to be updated.
 +
 +        FateId fateId = tablet.getSelectedFiles().getFateId();
 +
 +        Preconditions.checkArgument(!tablet.getCompacted().contains(fateId),
 +            "Tablet %s unexpected has selected files and compacted columns 
for %s",
 +            tablet.getExtent(), fateId);
 +
 +        // TODO set to trace
 +        LOG.debug("All selected files compacted for {} setting compacted for 
{}",
 +            tablet.getExtent(), tablet.getSelectedFiles().getFateId());
 +
 +        tabletMutator.deleteSelectedFiles();
 +        tabletMutator.putCompacted(fateId);
 +
 +      } else {
 +        // not all of the selected files were finished, so need to add the 
new file to the
 +        // selected set
 +
 +        Set<StoredTabletFile> newSelectedFileSet =
 +            new HashSet<>(tablet.getSelectedFiles().getFiles());
 +        newSelectedFileSet.removeAll(ecm.getJobFiles());
 +
 +        if (newDatafile.isPresent()) {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, adding new selected 
file {} from compaction",
 +              tablet.getExtent(), 
newDatafile.orElseThrow().getPath().getName());
 +          newSelectedFileSet.add(newDatafile.orElseThrow().insert());
 +        } else {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, compaction produced no 
output so not adding to selected set.",
 +              tablet.getExtent());
 +        }
 +
 +        tabletMutator.putSelectedFiles(
 +            new SelectedFiles(newSelectedFileSet, 
tablet.getSelectedFiles().initiallySelectedAll(),
 +                tablet.getSelectedFiles().getFateId()));
 +      }
 +    }
 +
 +    if (tablet.getLocation() != null) {
 +      // add scan entries to prevent GC in case the hosted tablet is 
currently using the files for
 +      // scan
 +      ecm.getJobFiles().forEach(tabletMutator::putScan);
 +    }
 +    ecm.getJobFiles().forEach(tabletMutator::deleteFile);
 +    tabletMutator.deleteExternalCompaction(ecid);
 +
 +    if (newDatafile.isPresent()) {
 +      tabletMutator.putFile(newDatafile.orElseThrow(),
 +          new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
 +    }
 +  }
 +
 +  // ELASTICITY_TODO unit test this method
 +  public static boolean canCommitCompaction(ExternalCompactionId ecid,
 +      TabletMetadata tabletMetadata) {
 +
 +    if (tabletMetadata == null) {
 +      LOG.debug("Received completion notification for nonexistent tablet {}", 
ecid);
 +      return false;
 +    }
 +
 +    var extent = tabletMetadata.getExtent();
 +
 +    if (tabletMetadata.getOperationId() != null) {
 +      // split, merge, and delete tablet should delete the compaction entry 
in the tablet
 +      LOG.debug("Received completion notification for tablet with active 
operation {} {} {}", ecid,
 +          extent, tabletMetadata.getOperationId());
 +      return false;
 +    }
 +
 +    CompactionMetadata ecm = 
tabletMetadata.getExternalCompactions().get(ecid);
 +
 +    if (ecm == null) {
 +      LOG.debug("Received completion notification for unknown compaction {} 
{}", ecid, extent);
 +      return false;
 +    }
 +
 +    if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
 +      if (tabletMetadata.getSelectedFiles() == null) {
 +        // when the compaction is canceled, selected files are deleted
 +        LOG.debug(
 +            "Received completion notification for user compaction and tablet 
has no selected files {} {}",
 +            ecid, extent);
 +        return false;
 +      }
 +
 +      if 
(!ecm.getFateId().equals(tabletMetadata.getSelectedFiles().getFateId())) {
 +        // maybe the compaction was cancled and another user compaction was 
started on the tablet.
 +        LOG.debug(
 +            "Received completion notification for user compaction where its 
fate txid did not match the tablets {} {} {} {}",
 +            ecid, extent, ecm.getFateId(), 
tabletMetadata.getSelectedFiles().getFateId());
 +      }
 +
 +      if 
(!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) {
 +        // this is not expected to happen
 +        LOG.error("User compaction contained files not in the selected set {} 
{} {} {} {}",
 +            tabletMetadata.getExtent(), ecid, ecm.getKind(),
 +            
Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles),
 +            ecm.getJobFiles());
 +        return false;
 +      }
 +    }
 +
 +    if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) {
 +      // this is not expected to happen
 +      LOG.error("Compaction contained files not in the tablet files set {} {} 
{} {}",
 +          tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), 
ecm.getJobFiles());
 +      return false;
 +    }
 +
 +    return true;
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index c1849d25dd,562423750b..0210584cb5
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@@ -18,27 -18,16 +18,24 @@@
   */
  package org.apache.accumulo.manager.tableOps.bulkVer2;
  
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
- import static java.util.concurrent.TimeUnit.SECONDS;
- 
  import java.io.IOException;
++import java.time.Duration;
  import java.util.Collections;
 +import java.util.Map;
 +import java.util.Optional;
  
  import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.data.AbstractId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.fate.Repo;
  import org.apache.accumulo.core.gc.ReferenceFile;
 -import org.apache.accumulo.core.manager.state.tables.TableState;
  import org.apache.accumulo.core.manager.thrift.BulkImportState;
  import org.apache.accumulo.core.metadata.schema.Ample;
 +import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.manager.Manager;
  import org.apache.accumulo.manager.tableOps.ManagerRepo;
  import org.apache.accumulo.manager.tableOps.Utils;
@@@ -93,56 -89,4 +90,56 @@@ public class CleanUpBulkImport extends 
      manager.removeBulkImportStatus(info.sourceDir);
      return null;
    }
 +
 +  private static void removeBulkLoadEntries(Ample ample, TableId tableId, 
FateId fateId,
 +      Text firstSplit, Text lastSplit) {
 +
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (true) {
 +      try (
 +          var tablets = 
ample.readTablets().forTable(tableId).overlapping(firstSplit, lastSplit)
 +              .checkConsistency().fetch(ColumnType.PREV_ROW, 
ColumnType.LOADED).build();
 +          var tabletsMutator = ample.conditionallyMutateTablets()) {
 +
 +        for (var tablet : tablets) {
 +          if (tablet.getLoaded().values().stream()
 +              .anyMatch(loadedFateId -> loadedFateId.equals(fateId))) {
 +            var tabletMutator =
 +                
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
 +            tablet.getLoaded().entrySet().stream().filter(entry -> 
entry.getValue().equals(fateId))
 +                
.map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
 +            tabletMutator.submit(tm -> false);
 +          }
 +        }
 +
 +        var results = tabletsMutator.process();
 +
 +        if (results.values().stream()
 +            .anyMatch(condResult -> condResult.getStatus() != 
Status.ACCEPTED)) {
 +
 +          results.forEach((extent, condResult) -> {
 +            if (condResult.getStatus() != Status.ACCEPTED) {
 +              var metadata = Optional.ofNullable(condResult.readMetadata());
 +              log.debug("Tablet update failed {} {} {} {} ", fateId, extent, 
condResult.getStatus(),
 +                  
metadata.map(TabletMetadata::getOperationId).map(AbstractId::toString)
 +                      .orElse("tablet is gone"));
 +            }
 +          });
 +
 +          try {
 +            retry.waitForNextAttempt(log,
 +                String.format("%s tableId:%s conditional mutations to delete 
load markers failed.",
 +                    fateId, tableId));
 +          } catch (InterruptedException e) {
 +            throw new RuntimeException(e);
 +          }
 +        } else {
 +          break;
 +        }
 +      }
 +    }
 +  }
  }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index a4f7928060,0000000000..d2bc8c9b91
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@@ -1,192 -1,0 +1,190 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.tableOps.bulkVer2;
 +
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
- import static java.util.concurrent.TimeUnit.SECONDS;
 +import static java.util.stream.Collectors.groupingBy;
 +import static java.util.stream.Collectors.mapping;
 +import static java.util.stream.Collectors.toList;
 +
++import java.time.Duration;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.function.Predicate;
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 +import org.apache.accumulo.core.trace.TraceUtil;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class TabletRefresher {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(TabletRefresher.class);
 +
 +  public static void refresh(ServerContext context,
 +      Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, 
TableId tableId,
 +      byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) 
{
 +
 +    // ELASTICITY_TODO should this thread pool be configurable?
 +    ThreadPoolExecutor threadPool =
 +        context.threadPools().createFixedThreadPool(10, "Tablet refresh " + 
fateId, false);
 +
 +    try (var tablets = context.getAmple().readTablets().forTable(tableId)
 +        .overlapping(startRow, endRow).checkConsistency()
 +        .fetch(ColumnType.LOADED, ColumnType.LOCATION, 
ColumnType.PREV_ROW).build()) {
 +
 +      // Find all tablets that need to refresh their metadata. There may be 
some tablets that were
 +      // hosted after the tablet files were updated, it just results in an 
unneeded refresh
 +      // request. There may also be tablets that had a location when the 
files were set but do not
 +      // have a location now, that is ok the next time that tablet loads 
somewhere it will see the
 +      // files.
 +
 +      var tabletIterator =
 +          tablets.stream().filter(tabletMetadata -> 
tabletMetadata.getLocation() != null)
 +              .filter(needsRefresh).iterator();
 +
 +      // avoid reading all tablets into memory and instead process batches of 
1000 tablets at a time
 +      Iterators.partition(tabletIterator, 1000).forEachRemaining(batch -> {
 +        var refreshesNeeded = 
batch.stream().collect(groupingBy(TabletMetadata::getLocation,
 +            mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), 
toList())));
 +
 +        refreshTablets(threadPool, fateId.canonical(), context, 
onlineTserversSupplier,
 +            refreshesNeeded);
 +      });
 +
 +    } finally {
 +      threadPool.shutdownNow();
 +    }
 +
 +  }
 +
 +  public static void refreshTablets(ExecutorService threadPool, String logId, 
ServerContext context,
 +      Supplier<Set<TServerInstance>> onlineTserversSupplier,
 +      Map<TabletMetadata.Location,List<TKeyExtent>> refreshesNeeded) {
 +
 +    // make a copy as it will be mutated in this method
 +    refreshesNeeded = new HashMap<>(refreshesNeeded);
 +
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (!refreshesNeeded.isEmpty()) {
 +
 +      Map<TabletMetadata.Location,Future<List<TKeyExtent>>> futures = new 
HashMap<>();
 +
 +      for (Map.Entry<TabletMetadata.Location,List<TKeyExtent>> entry : 
refreshesNeeded.entrySet()) {
 +
 +        // Ask tablet server to reload the metadata for these tablets. The 
tablet server returns
 +        // the list of extents it was hosting but was unable to refresh (the 
tablets could be in
 +        // the process of loading). If it is not currently hosting the tablet 
it treats that as
 +        // refreshed and does not return anything for it.
 +        Future<List<TKeyExtent>> future = threadPool
 +            .submit(() -> sendSyncRefreshRequest(context, logId, 
entry.getKey(), entry.getValue()));
 +
 +        futures.put(entry.getKey(), future);
 +      }
 +
 +      for (Map.Entry<TabletMetadata.Location,Future<List<TKeyExtent>>> entry 
: futures.entrySet()) {
 +        TabletMetadata.Location location = entry.getKey();
 +        Future<List<TKeyExtent>> future = entry.getValue();
 +
 +        List<TKeyExtent> nonRefreshedExtents = null;
 +        try {
 +          nonRefreshedExtents = future.get();
 +        } catch (InterruptedException | ExecutionException e) {
 +          throw new RuntimeException(e);
 +        }
 +        if (nonRefreshedExtents.isEmpty()) {
 +          // tablet server was able to refresh everything, so remove that 
location
 +          refreshesNeeded.remove(location);
 +        } else {
 +          // tablet server could not refresh some tablets, try them again 
later.
 +          refreshesNeeded.put(location, nonRefreshedExtents);
 +        }
 +      }
 +
 +      // look for any tservers that have died since we read the metadata 
table and remove them
 +      if (!refreshesNeeded.isEmpty()) {
 +        Set<TServerInstance> liveTservers = onlineTserversSupplier.get();
 +
 +        refreshesNeeded.keySet()
 +            .removeIf(location -> 
!liveTservers.contains(location.getServerInstance()));
 +      }
 +
 +      if (!refreshesNeeded.isEmpty()) {
 +        try {
 +          retry.waitForNextAttempt(log, logId + " waiting for " + 
refreshesNeeded.size()
 +              + " tservers to refresh their tablets metadata");
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  private static List<TKeyExtent> sendSyncRefreshRequest(ServerContext 
context, String logId,
 +      TabletMetadata.Location location, List<TKeyExtent> refreshes) {
 +    TabletServerClientService.Client client = null;
 +    try {
 +      log.trace("{} sending refresh request to {} for {} extents", logId, 
location,
 +          refreshes.size());
 +      var timeInMillis = 
context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
 +      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
location.getHostAndPort(),
 +          context, timeInMillis);
 +
 +      var unrefreshed = client.refreshTablets(TraceUtil.traceInfo(), 
context.rpcCreds(), refreshes);
 +
 +      log.trace("{} refresh request to {} returned {} unrefreshed extents", 
logId, location,
 +          unrefreshed.size());
 +
 +      return unrefreshed;
 +    } catch (TException ex) {
 +      log.debug("rpc failed server: " + location + ", " + logId + " " + 
ex.getMessage(), ex);
 +
 +      // ELASTICITY_TODO are there any other exceptions we should catch in 
this method and check if
 +      // the tserver is till alive?
 +
 +      // something went wrong w/ RPC return all extents as unrefreshed
 +      return refreshes;
 +    } finally {
 +      ThriftUtil.returnClient(client, context);
 +    }
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 9fee825488,992eba99be..b3f373cb16
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@@ -18,24 -18,12 +18,22 @@@
   */
  package org.apache.accumulo.manager.tableOps.compact;
  
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static java.util.concurrent.TimeUnit.MINUTES;
- import static java.util.concurrent.TimeUnit.SECONDS;
 -import static java.nio.charset.StandardCharsets.UTF_8;
 -import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID;
 -import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
  import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;
  
 -import org.apache.accumulo.core.Constants;
++import java.time.Duration;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Consumer;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
  import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
  import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
  import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@@ -309,77 -171,8 +307,77 @@@ class CompactionDriver extends ManagerR
    }
  
    @Override
 -  public void undo(long tid, Manager environment) {
 +  public void undo(FateId fateId, Manager env) throws Exception {
 +    cleanupTabletMetadata(fateId, env);
 +
 +    // For any compactions that may have happened before this operation 
failed, attempt to refresh
 +    // tablets.
 +    TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, 
fateId, tableId, startRow,
 +        endRow, tabletMetadata -> true);
 +  }
  
 +  /**
 +   * Cleans up any tablet metadata that may have been added as part of this 
compaction operation.
 +   */
 +  private void cleanupTabletMetadata(FateId fateId, Manager manager) throws 
Exception {
 +    var ample = manager.getContext().getAmple();
 +
 +    // ELASTICITY_TODO use existing compaction logging
 +
 +    boolean allCleanedUp = false;
 +
-     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-         .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5)
-         .logInterval(3, MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (!allCleanedUp) {
 +
 +      AtomicLong rejectedCount = new AtomicLong(0);
 +      Consumer<Ample.ConditionalResult> resultConsumer = result -> {
 +        if (result.getStatus() == Status.REJECTED) {
 +          log.debug("{} update for {} was rejected ", fateId, 
result.getExtent());
 +          rejectedCount.incrementAndGet();
 +        }
 +      };
 +
 +      try (var tablets = 
ample.readTablets().forTable(tableId).overlapping(startRow, endRow)
 +          .fetch(PREV_ROW, COMPACTED, SELECTED, 
USER_COMPACTION_REQUESTED).checkConsistency()
 +          .build(); var tabletsMutator = 
ample.conditionallyMutateTablets(resultConsumer)) {
 +        Predicate<TabletMetadata> needsUpdate =
 +            tabletMetadata -> (tabletMetadata.getSelectedFiles() != null
 +                && 
tabletMetadata.getSelectedFiles().getFateId().equals(fateId))
 +                || tabletMetadata.getCompacted().contains(fateId)
 +                || 
tabletMetadata.getUserCompactionsRequested().contains(fateId);
 +        Predicate<TabletMetadata> needsNoUpdate = needsUpdate.negate();
 +
 +        for (TabletMetadata tablet : tablets) {
 +
 +          if (needsUpdate.test(tablet)) {
 +            var mutator = 
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
 +                .requireSame(tablet, COMPACTED, SELECTED);
 +            if (tablet.getSelectedFiles() != null
 +                && tablet.getSelectedFiles().getFateId().equals(fateId)) {
 +              mutator.deleteSelectedFiles();
 +            }
 +
 +            if (tablet.getCompacted().contains(fateId)) {
 +              mutator.deleteCompacted(fateId);
 +            }
 +            if (tablet.getUserCompactionsRequested().contains(fateId)) {
 +              mutator.deleteUserCompactionRequested(fateId);
 +            }
 +
 +            mutator.submit(needsNoUpdate::test);
 +          }
 +        }
 +      }
 +
 +      allCleanedUp = rejectedCount.get() == 0;
 +
 +      if (!allCleanedUp) {
 +        retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction 
" + fateId);
 +      }
 +    }
    }
  
  }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0de617455c,b9c9a39359..63fcbd4c5b
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -28,6 -32,8 +28,7 @@@ import java.io.IOException
  import java.lang.management.ManagementFactory;
  import java.lang.reflect.InvocationTargetException;
  import java.net.UnknownHostException;
+ import java.time.Duration;
 -import java.time.Instant;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;

Reply via email to