This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f674e9a0b0 Moves the compaction commit process into FATE (#4109)
f674e9a0b0 is described below

commit f674e9a0b0a2b09090dae3d216c482ef26c0fde9
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jan 12 19:51:32 2024 -0500

    Moves the compaction commit process into FATE (#4109)
    
    The custom refresh tracking code was removed and compaction commit was
    moved to being a FATE operation with the following 4 steps.
    
     1. Rename file done in RenameCompactionFile class
     2. Update the metadata table via a conditional mutation done in
       CommitCompaction class
     3. Write the gc candidates done in PutGcCandidates class
     4. Optionally send a RPC refresh request if the tablet was hosted
        done in RefreshTablet class
    
    There is some follow on work that still needs to be done to improve
    how this change works with detecting dead compactions.  After that is
    done these changes should address problems outlined #3811 and #3802
    that were related to process death before adding GC candidates.  Now
    that GC candidates are written in FATE, if it dies it will run again
    later.
    
    This is currently storing the compaction commit FATE operations in
    zookeeper.  This would not be suitable for a cluster because per tablet
    information should never be stored in zookeeper.  However its fine as a
    temporary situation in the elasticity branch until FATE storage is
    availabe in an accumulo table, see #4049 and #3559
---
 .../apache/accumulo/core/metadata/RootTable.java   |   4 -
 .../accumulo/core/metadata/schema/Ample.java       |  43 --
 .../core/metadata/schema/MetadataSchema.java       |  13 -
 .../accumulo/server/init/ZooKeeperInitializer.java |   3 -
 .../accumulo/server/metadata/RefreshesImpl.java    | 254 -----------
 .../accumulo/server/metadata/ServerAmpleImpl.java  |   6 -
 .../java/org/apache/accumulo/manager/Manager.java  |   3 +-
 .../coordinator/CompactionCoordinator.java         | 481 ++-------------------
 .../coordinator/commit/CommitCompaction.java       | 281 ++++++++++++
 .../coordinator/commit/CompactionCommitData.java   |  62 +++
 .../coordinator/commit/PutGcCandidates.java        |  51 +++
 .../coordinator/commit/RefreshTablet.java          |  64 +++
 .../coordinator/commit/RenameCompactionFile.java   |  92 ++++
 .../accumulo/test/compaction/RefreshesIT.java      |  97 -----
 14 files changed, 595 insertions(+), 859 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index ce3c654908..7c00cc0e81 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -43,10 +43,6 @@ public class RootTable {
    * ZK path relative to the zookeeper node where the root tablet gc 
candidates are stored.
    */
   public static final String ZROOT_TABLET_GC_CANDIDATES = ZROOT_TABLET + 
"/gc_candidates";
-  /*
-   * ZK path relative to the zookeeper node where the root tablet refresh 
entries are stored.
-   */
-  public static final String ZROOT_TABLET_REFRESHES = ZROOT_TABLET + 
"/refreshes";
 
   public static final KeyExtent EXTENT = new KeyExtent(ID, null, null);
   public static final KeyExtent OLD_EXTENT =
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 98666a74c8..9bcaa83ef7 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.core.metadata.schema;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
@@ -675,46 +674,4 @@ public interface Ample {
   default void removeBulkLoadInProgressFlag(String path) {
     throw new UnsupportedOperationException();
   }
-
-  interface Refreshes {
-    static class RefreshEntry {
-      private final ExternalCompactionId ecid;
-
-      private final KeyExtent extent;
-      private final TServerInstance tserver;
-
-      public RefreshEntry(ExternalCompactionId ecid, KeyExtent extent, 
TServerInstance tserver) {
-        this.ecid = Objects.requireNonNull(ecid);
-        this.extent = Objects.requireNonNull(extent);
-        this.tserver = Objects.requireNonNull(tserver);
-      }
-
-      public ExternalCompactionId getEcid() {
-        return ecid;
-      }
-
-      public KeyExtent getExtent() {
-        return extent;
-      }
-
-      public TServerInstance getTserver() {
-        return tserver;
-      }
-    }
-
-    void add(Collection<RefreshEntry> entries);
-
-    void delete(Collection<RefreshEntry> entries);
-
-    Stream<RefreshEntry> stream();
-  }
-
-  /**
-   * Refresh entries in the metadata table are used to track hosted tablets 
that need to have their
-   * metadata refreshed after a compaction. These entries ensure the refresh 
happens even in the
-   * case of process death.
-   */
-  default Refreshes refreshes(DataLevel dataLevel) {
-    throw new UnsupportedOperationException();
-  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index a69651e3d0..ceecfb0a77 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -533,17 +533,4 @@ public class MetadataSchema {
       return section.getRowPrefix();
     }
   }
-
-  public static class RefreshSection {
-    private static final Section section =
-        new Section(RESERVED_PREFIX + "refresh", true, RESERVED_PREFIX + 
"refresi", false);
-
-    public static Range getRange() {
-      return section.getRange();
-    }
-
-    public static String getRowPrefix() {
-      return section.getRowPrefix();
-    }
-  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
 
b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
index cbf59bc52c..719f468032 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
@@ -47,7 +47,6 @@ import 
org.apache.accumulo.server.conf.codec.VersionedPropCodec;
 import org.apache.accumulo.server.conf.codec.VersionedProperties;
 import org.apache.accumulo.server.conf.store.SystemPropKey;
 import org.apache.accumulo.server.log.WalStateManager;
-import org.apache.accumulo.server.metadata.RefreshesImpl;
 import org.apache.accumulo.server.metadata.RootGcCandidates;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.zookeeper.KeeperException;
@@ -137,8 +136,6 @@ public class ZooKeeperInitializer {
         ZooUtil.NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + 
RootTable.ZROOT_TABLET_GC_CANDIDATES,
         new RootGcCandidates().toJson().getBytes(UTF_8), 
ZooUtil.NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_REFRESHES,
-        RefreshesImpl.getInitialJson().getBytes(UTF_8), 
ZooUtil.NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMANAGERS, 
EMPTY_BYTE_ARRAY,
         ZooUtil.NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMANAGER_LOCK, 
EMPTY_BYTE_ARRAY,
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java
deleted file mode 100644
index 531136853b..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.metadata;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_REFRESHES;
-import static org.apache.accumulo.core.util.LazySingletons.GSON;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.RefreshSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class RefreshesImpl implements Ample.Refreshes {
-
-  private static final Logger log = 
LoggerFactory.getLogger(RefreshesImpl.class);
-
-  private final Ample.DataLevel dataLevel;
-  private final ServerContext context;
-
-  public static String getInitialJson() {
-    return toJson(Map.of());
-  }
-
-  // the expected version of serialized data
-  private static final int CURRENT_VERSION = 1;
-
-  // This class is used to serialize and deserialize root tablet metadata 
using GSon. Any changes to
-  // this class must consider persisted data.
-  private static class Data {
-
-    final int version;
-
-    final Map<String,String> entries;
-
-    public Data(int version, Map<String,String> entries) {
-      this.version = version;
-      this.entries = entries;
-    }
-  }
-
-  private static String toJson(Map<String,String> entries) {
-    var data = new Data(CURRENT_VERSION, Objects.requireNonNull(entries));
-    return GSON.get().toJson(data, Data.class);
-  }
-
-  private Map<String,String> fromJson(String json) {
-    Data data = GSON.get().fromJson(json, Data.class);
-    Preconditions.checkArgument(data.version == CURRENT_VERSION, "Expected 
version %s saw %s",
-        CURRENT_VERSION, data.version);
-    Objects.requireNonNull(data.entries);
-    return data.entries;
-  }
-
-  public RefreshesImpl(ServerContext context, Ample.DataLevel dataLevel) {
-    this.context = context;
-    this.dataLevel = dataLevel;
-  }
-
-  private void mutateRootRefreshes(Consumer<Map<String,String>> mutator) {
-    String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_REFRESHES;
-    try {
-      context.getZooReaderWriter().mutateExisting(zpath, currVal -> {
-        String currJson = new String(currVal, UTF_8);
-        log.debug("Root refreshes before change : {}", currJson);
-        Map<String,String> entries = fromJson(currJson);
-        mutator.accept(entries);
-        String newJson = toJson(entries);
-        log.debug("Root refreshes after change  : {}", newJson);
-        if (newJson.length() > 262_144) {
-          log.warn("Root refreshes stored in ZK at {} are getting large ({} 
bytes)."
-              + " Large nodes may cause problems for Zookeeper!", zpath, 
newJson.length());
-        }
-        return newJson.getBytes(UTF_8);
-      });
-    } catch (Exception e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  private Text createRow(RefreshEntry entry) {
-    return new Text(RefreshSection.getRowPrefix() + 
entry.getEcid().canonical());
-  }
-
-  private Mutation createAddMutation(RefreshEntry entry) {
-    Mutation m = new Mutation(createRow(entry));
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos)) {
-      dos.writeInt(1);
-      entry.getExtent().writeTo(dos);
-      dos.writeUTF(entry.getTserver().getHostPortSession());
-      dos.close();
-      m.at().family("").qualifier("").put(baos.toByteArray());
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-
-    return m;
-  }
-
-  private Mutation createDeleteMutation(RefreshEntry entry) {
-    Mutation m = new Mutation(createRow(entry));
-    m.putDelete("", "");
-    return m;
-  }
-
-  private void requireRootTablets(Collection<RefreshEntry> entries) {
-    if (!entries.stream().allMatch(e -> e.getExtent().isRootTablet())) {
-      var nonRootTablets = entries.stream().map(RefreshEntry::getExtent)
-          .filter(e -> !e.isRootTablet()).collect(Collectors.toSet());
-      throw new IllegalArgumentException("Expected only root tablet but saw " 
+ nonRootTablets);
-    }
-  }
-
-  private RefreshEntry decode(Map.Entry<Key,Value> entry) {
-    String row = entry.getKey().getRowData().toString();
-    Preconditions.checkArgument(row.startsWith(RefreshSection.getRowPrefix()),
-        "Row %s did not start with %s", row, RefreshSection.getRowPrefix());
-    Preconditions.checkArgument(entry.getKey().getColumnFamilyData().length() 
== 0,
-        "Expected empty family but saw %s", 
entry.getKey().getColumnFamilyData());
-    
Preconditions.checkArgument(entry.getKey().getColumnQualifierData().length() == 
0,
-        "Expected empty qualifier but saw %s", 
entry.getKey().getColumnQualifierData());
-
-    try (ByteArrayInputStream bais = new 
ByteArrayInputStream(entry.getValue().get());
-        DataInputStream dis = new DataInputStream(bais)) {
-      var version = dis.readInt();
-      Preconditions.checkArgument(version == CURRENT_VERSION, "Expected 
version %s saw %s",
-          CURRENT_VERSION, version);
-      var extent = KeyExtent.readFrom(dis);
-      var tserver = new TServerInstance(dis.readUTF());
-      return new RefreshEntry(
-          
ExternalCompactionId.of(row.substring(RefreshSection.getRowPrefix().length())), 
extent,
-          tserver);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  @Override
-  public void add(Collection<RefreshEntry> entries) {
-    Objects.requireNonNull(entries);
-    if (dataLevel == Ample.DataLevel.ROOT) {
-      // expect all of these to be the root tablet, verifying because its not 
stored
-      requireRootTablets(entries);
-      Consumer<Map<String,String>> mutator = map -> 
entries.forEach(refreshEntry -> map
-          .put(refreshEntry.getEcid().canonical(), 
refreshEntry.getTserver().getHostPortSession()));
-      mutateRootRefreshes(mutator);
-    } else {
-      try (BatchWriter writer = 
context.createBatchWriter(dataLevel.metaTable())) {
-        for (RefreshEntry entry : entries) {
-          writer.addMutation(createAddMutation(entry));
-        }
-      } catch (MutationsRejectedException | TableNotFoundException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
-  @Override
-  public void delete(Collection<RefreshEntry> entries) {
-    Objects.requireNonNull(entries);
-    if (dataLevel == Ample.DataLevel.ROOT) {
-      // expect all of these to be the root tablet, verifying because its not 
stored
-      requireRootTablets(entries);
-      Consumer<Map<String,String>> mutator =
-          map -> entries.forEach(refreshEntry -> 
map.remove(refreshEntry.getEcid().canonical()));
-      mutateRootRefreshes(mutator);
-    } else {
-      try (BatchWriter writer = 
context.createBatchWriter(dataLevel.metaTable())) {
-        for (RefreshEntry entry : entries) {
-          writer.addMutation(createDeleteMutation(entry));
-        }
-      } catch (MutationsRejectedException | TableNotFoundException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
-  @Override
-  public Stream<RefreshEntry> stream() {
-    if (dataLevel == Ample.DataLevel.ROOT) {
-      var zooReader = context.getZooReader();
-      byte[] jsonBytes;
-      try {
-        jsonBytes =
-            zooReader.getData(context.getZooKeeperRoot() + 
RootTable.ZROOT_TABLET_REFRESHES);
-        return fromJson(new String(jsonBytes, UTF_8)).entrySet().stream()
-            .map(e -> new RefreshEntry(ExternalCompactionId.of(e.getKey()), 
RootTable.EXTENT,
-                new TServerInstance(e.getValue())));
-      } catch (KeeperException | InterruptedException e) {
-        throw new IllegalStateException(e);
-      }
-    } else {
-      Range range = RefreshSection.getRange();
-
-      Scanner scanner;
-      try {
-        scanner = context.createScanner(dataLevel.metaTable(), 
Authorizations.EMPTY);
-      } catch (TableNotFoundException e) {
-        throw new IllegalStateException(e);
-      }
-      scanner.setRange(range);
-      return scanner.stream().onClose(scanner::close).map(this::decode);
-    }
-  }
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 316082b1d4..8e0beae740 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -338,10 +338,4 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       throw new IllegalStateException(e);
     }
   }
-
-  @Override
-  public Refreshes refreshes(DataLevel dataLevel) {
-    return new RefreshesImpl(context, dataLevel);
-  }
-
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index cfb311f383..6bba307b05 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -941,7 +941,8 @@ public class Manager extends AbstractServer
     // Start the Manager's Fate Service
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
-    compactionCoordinator = new CompactionCoordinator(context, tserverSet, 
security, nextEvent);
+    compactionCoordinator =
+        new CompactionCoordinator(context, tserverSet, security, nextEvent, 
fateRefs);
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
     ManagerClientService.Iface haProxy =
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 41196776c2..3deae9424a 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -26,8 +26,6 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
-import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
-import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 
 import java.io.FileNotFoundException;
@@ -36,22 +34,16 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -76,25 +68,21 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
-import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
 import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -103,7 +91,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.cache.Caches.CacheName;
@@ -114,8 +101,11 @@ import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.manager.EventCoordinator;
+import org.apache.accumulo.manager.Manager;
+import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
+import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
+import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
-import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 import org.apache.accumulo.server.compaction.CompactionPluginUtils;
@@ -135,10 +125,8 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.Weigher;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.MoreExecutors;
 
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.MeterRegistry;
@@ -159,22 +147,15 @@ public class CompactionCoordinator
   protected static final Map<ExternalCompactionId,RunningCompaction> 
RUNNING_CACHE =
       new ConcurrentHashMap<>();
 
-  /*
-   * When the manager starts up any refreshes that were in progress when the 
last manager process
-   * died must be completed before new refresh entries are written. This map 
of countdown latches
-   * helps achieve that goal.
-   */
-  private final Map<Ample.DataLevel,CountDownLatch> refreshLatches;
-
   /* Map of group name to last time compactor called to get a compaction job */
   // ELASTICITY_TODO need to clean out groups that are no longer configured..
   private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
   private final ServerContext ctx;
-  private final LiveTServerSet tserverSet;
   private final SecurityOperation security;
   private final CompactionJobQueues jobQueues;
   private final EventCoordinator eventCoordinator;
+  private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
   // Exposed for tests
   protected volatile Boolean shutdown = false;
 
@@ -188,9 +169,9 @@ public class CompactionCoordinator
   private final QueueMetrics queueMetrics;
 
   public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
-      SecurityOperation security, EventCoordinator eventCoordinator) {
+      SecurityOperation security, EventCoordinator eventCoordinator,
+      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
     this.ctx = ctx;
-    this.tserverSet = tservers;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
     this.eventCoordinator = eventCoordinator;
@@ -200,11 +181,7 @@ public class CompactionCoordinator
 
     this.queueMetrics = new QueueMetrics(jobQueues);
 
-    var refreshLatches = new 
EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class);
-    refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1));
-    refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1));
-    refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1));
-    this.refreshLatches = Collections.unmodifiableMap(refreshLatches);
+    this.fateInstances = fateInstances;
 
     completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
         .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
@@ -262,59 +239,9 @@ public class CompactionCoordinator
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
-  private void processRefreshes(Ample.DataLevel dataLevel) {
-    try (var refreshStream = ctx.getAmple().refreshes(dataLevel).stream()) {
-      // process batches of refresh entries to avoid reading all into memory 
at once
-      Iterators.partition(refreshStream.iterator(), 
10000).forEachRemaining(refreshEntries -> {
-        LOG.info("Processing {} tablet refreshes for {}", 
refreshEntries.size(), dataLevel);
-
-        var extents =
-            
refreshEntries.stream().map(RefreshEntry::getExtent).collect(Collectors.toList());
-        var tabletsMeta = new HashMap<KeyExtent,TabletMetadata>();
-        try (var tablets = ctx.getAmple().readTablets().forTablets(extents, 
Optional.empty())
-            .fetch(PREV_ROW, LOCATION, SCANS).build()) {
-          tablets.stream().forEach(tm -> tabletsMeta.put(tm.getExtent(), tm));
-        }
-
-        var tserverRefreshes = new 
HashMap<TabletMetadata.Location,List<TKeyExtent>>();
-
-        refreshEntries.forEach(refreshEntry -> {
-          var tm = tabletsMeta.get(refreshEntry.getExtent());
-
-          // only need to refresh if the tablet is still on the same tserver 
instance
-          if (tm != null && tm.getLocation() != null
-              && 
tm.getLocation().getServerInstance().equals(refreshEntry.getTserver())) {
-            KeyExtent extent = tm.getExtent();
-            Collection<StoredTabletFile> scanfiles = tm.getScans();
-            var ttr = extent.toThrift();
-            tserverRefreshes.computeIfAbsent(tm.getLocation(), k -> new 
ArrayList<>()).add(ttr);
-          }
-        });
-
-        String logId = "Coordinator:" + dataLevel;
-        ThreadPoolExecutor threadPool =
-            ctx.threadPools().createFixedThreadPool(10, "Tablet refresh " + 
logId, false);
-        try {
-          TabletRefresher.refreshTablets(threadPool, logId, ctx, 
tserverSet::getCurrentServers,
-              tserverRefreshes);
-        } finally {
-          threadPool.shutdownNow();
-        }
-
-        ctx.getAmple().refreshes(dataLevel).delete(refreshEntries);
-      });
-    }
-    // allow new refreshes to be written now that all preexisting ones are 
processed
-    refreshLatches.get(dataLevel).countDown();
-  }
-
   @Override
   public void run() {
 
-    processRefreshes(Ample.DataLevel.ROOT);
-    processRefreshes(Ample.DataLevel.METADATA);
-    processRefreshes(Ample.DataLevel.USER);
-
     startCompactionCleaner(schedExecutor);
     startRunningCleaner(schedExecutor);
 
@@ -669,58 +596,6 @@ public class CompactionCoordinator
     return this;
   }
 
-  class RefreshWriter {
-
-    private final ExternalCompactionId ecid;
-    private final KeyExtent extent;
-
-    private RefreshEntry writtenEntry;
-
-    RefreshWriter(ExternalCompactionId ecid, KeyExtent extent) {
-      this.ecid = ecid;
-      this.extent = extent;
-
-      var dataLevel = Ample.DataLevel.of(extent.tableId());
-      try {
-        // Wait for any refresh entries from the previous manager process to 
be processed before
-        // writing new ones.
-        refreshLatches.get(dataLevel).await();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void writeRefresh(TabletMetadata.Location location) {
-      Objects.requireNonNull(location);
-
-      if (writtenEntry != null) {
-        if (location.getServerInstance().equals(writtenEntry.getTserver())) {
-          // the location was already written so nothing to do
-          return;
-        } else {
-          deleteRefresh();
-        }
-      }
-
-      var entry = new RefreshEntry(ecid, extent, location.getServerInstance());
-
-      
ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())).add(List.of(entry));
-
-      LOG.debug("wrote refresh entry for {}", ecid);
-
-      writtenEntry = entry;
-    }
-
-    public void deleteRefresh() {
-      if (writtenEntry != null) {
-        ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId()))
-            .delete(List.of(writtenEntry));
-        LOG.debug("deleted refresh entry for {}", ecid);
-        writtenEntry = null;
-      }
-    }
-  }
-
   private Optional<CompactionConfig> 
getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
     if (metaJob.getJob().getKind() == CompactionKind.USER
         && metaJob.getTabletMetadata().getSelectedFiles() != null) {
@@ -738,11 +613,6 @@ public class CompactionCoordinator
    * <ol>
    * <li>Reads the tablets metadata and determines if the compaction can 
commit. Its possible that
    * things changed while the compaction was running and it can no longer 
commit.</li>
-   * <li>If the compaction can commit then a ~refresh entry may be written to 
the metadata table.
-   * This is done before attempting to commit to cover the case of process 
failure after commit. If
-   * the manager dies after commit then when it restarts it will see the 
~refresh entry and refresh
-   * that tablet. The ~refresh entry is only written when its a system 
compaction on a tablet with a
-   * location.</li>
    * <li>Commit the compaction using a conditional mutation. If the tablets 
files or location
    * changed since reading the tablets metadata, then conditional mutation 
will fail. When this
    * happens it will reread the metadata and go back to step 1 conceptually. 
When committing a
@@ -752,7 +622,6 @@ public class CompactionCoordinator
    * <li>After successful commit a refresh request is sent to the tablet if it 
has a location. This
    * will cause the tablet to start using the newly compacted files for future 
scans. Also the
    * tablet can delete the scan entries if there are no active scans using 
them.</li>
-   * <li>If a ~refresh entry was written, delete it since the refresh was 
successful.</li>
    * </ol>
    *
    * <p>
@@ -762,21 +631,6 @@ public class CompactionCoordinator
    * refresh was actually done. Therefore, user compactions will refresh as 
part of the fate
    * operation so that it's known to be done before the fate operation 
returns. Since the fate
    * operation will do it, there is no need to do it here for user compactions.
-   * </p>
-   *
-   * <p>
-   * The ~refresh entries serve a similar purpose to FATE operations, it 
ensures that code executes
-   * even when a process dies. FATE was intentionally not used for compaction 
commit because FATE
-   * stores its data in zookeeper. The refresh entry is stored in the metadata 
table, which is much
-   * more scalable than zookeeper. The number of system compactions of small 
files could be large
-   * and this would be a large number of writes to zookeeper. Zookeeper scales 
somewhat with reads,
-   * but not with writes.
-   * </p>
-   *
-   * <p>
-   * Issue #3559 was opened to explore the possibility of making compaction 
commit a fate operation
-   * which would remove the need for the ~refresh section.
-   * </p>
    *
    * @param tinfo trace info
    * @param credentials tcredentials object
@@ -795,7 +649,19 @@ public class CompactionCoordinator
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
 
+    // maybe fate has not started yet
+    var localFates = fateInstances.get();
+    while (localFates == null) {
+      UtilWaitThread.sleep(100);
+      if (shutdown) {
+        return;
+      }
+      localFates = fateInstances.get();
+    }
+
     var extent = KeyExtent.fromThrift(textent);
+    var localFate = 
localFates.get(FateInstanceType.fromTableId(extent.tableId()));
+
     LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
         extent);
     final var ecid = ExternalCompactionId.of(externalCompactionId);
@@ -803,286 +669,35 @@ public class CompactionCoordinator
     var tabletMeta =
         ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 
-    if (!canCommitCompaction(ecid, tabletMeta)) {
+    if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
       return;
     }
 
     CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
-
-    // ELASTICITY_TODO this code does not handle race conditions or faults. 
Need to ensure refresh
-    // happens in the case of manager process death between commit and refresh.
-    ReferencedTabletFile newDatafile =
-        TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName());
-
-    Optional<ReferencedTabletFile> optionalNewFile;
-    try {
-      optionalNewFile = renameOrDeleteFile(stats, ecm, newDatafile);
-    } catch (IOException e) {
-      LOG.warn("Can not commit complete compaction {} because unable to delete 
or rename {} ", ecid,
-          ecm.getCompactTmpName(), e);
-      compactionFailed(Map.of(ecid, extent));
-      return;
-    }
-
-    RefreshWriter refreshWriter = new RefreshWriter(ecid, extent);
-
-    try {
-      tabletMeta = commitCompaction(stats, ecid, tabletMeta, optionalNewFile, 
refreshWriter);
-    } catch (RuntimeException e) {
-      LOG.warn("Failed to commit complete compaction {} {}", ecid, extent, e);
-      compactionFailed(Map.of(ecid, extent));
-    }
-
-    if (ecm.getKind() != CompactionKind.USER) {
-      refreshTablet(tabletMeta);
-    }
-
-    // if a refresh entry was written, it can be removed after the tablet was 
refreshed
-    refreshWriter.deleteRefresh();
+    var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, 
extent, ecm, stats));
+
+    // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves 
two problem. First it
+    // defends against starting multiple fate txs for the same thing. This 
will help the split code
+    // also. Second the tag can be used by the dead compaction detector to 
ignore committing
+    // compactions. The imple coould hash the key to produce the fate tx id.
+    var txid = localFate.startTransaction();
+    localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true,
+        "Commit compaction " + ecid);
+
+    // ELASTICITY_TODO need to remove this wait. It is here because when the 
dead compaction
+    // detector ask a compactor what its currently running it expects that 
cover commit. To remove
+    // this wait would need another way for the dead compaction detector to 
know about committing
+    // compactions. Could add a tag to the fate tx with the ecid and have dead 
compaction detector
+    // scan these tags. This wait makes the code running in fate not be fault 
tolerant because in
+    // the
+    // case of faults the dead compaction detector may remove the compaction 
entry.
+    localFate.waitForCompletion(txid);
 
     // It's possible that RUNNING might not have an entry for this ecid in the 
case
     // of a coordinator restart when the Coordinator can't find the TServer 
for the
     // corresponding external compaction.
     recordCompletion(ecid);
-
-    // This will causes the tablet to be reexamined to see if it needs any 
more compactions.
-    eventCoordinator.event(extent, "Compaction completed %s", extent);
-  }
-
-  private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats 
stats,
-      CompactionMetadata ecm, ReferencedTabletFile newDatafile) throws 
IOException {
-    if (stats.getEntriesWritten() == 0) {
-      // the compaction produced no output so do not need to rename or add a 
file to the metadata
-      // table, only delete the input files.
-      if (!ctx.getVolumeManager().delete(ecm.getCompactTmpName().getPath())) {
-        throw new IOException("delete returned false");
-      }
-
-      return Optional.empty();
-    } else {
-      if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(),
-          newDatafile.getPath())) {
-        throw new IOException("rename returned false");
-      }
-
-      return Optional.of(newDatafile);
-    }
-  }
-
-  private void refreshTablet(TabletMetadata metadata) {
-    var location = metadata.getLocation();
-    if (location != null) {
-      KeyExtent extent = metadata.getExtent();
-
-      // there is a single tserver and single tablet, do not need a thread 
pool. The direct executor
-      // will run everything in the current thread
-      ExecutorService executorService = 
MoreExecutors.newDirectExecutorService();
-      try {
-        TabletRefresher.refreshTablets(executorService,
-            "compaction:" + metadata.getExtent().toString(), ctx, 
tserverSet::getCurrentServers,
-            Map.of(metadata.getLocation(), List.of(extent.toThrift())));
-      } finally {
-        executorService.shutdownNow();
-      }
-    }
-  }
-
-  // ELASTICITY_TODO unit test this method
-  private boolean canCommitCompaction(ExternalCompactionId ecid, 
TabletMetadata tabletMetadata) {
-
-    if (tabletMetadata == null) {
-      LOG.debug("Received completion notification for nonexistent tablet {}", 
ecid);
-      return false;
-    }
-
-    var extent = tabletMetadata.getExtent();
-
-    if (tabletMetadata.getOperationId() != null) {
-      // split, merge, and delete tablet should delete the compaction entry in 
the tablet
-      LOG.debug("Received completion notification for tablet with active 
operation {} {} {}", ecid,
-          extent, tabletMetadata.getOperationId());
-      return false;
-    }
-
-    CompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid);
-
-    if (ecm == null) {
-      LOG.debug("Received completion notification for unknown compaction {} 
{}", ecid, extent);
-      return false;
-    }
-
-    if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
-      if (tabletMetadata.getSelectedFiles() == null) {
-        // when the compaction is canceled, selected files are deleted
-        LOG.debug(
-            "Received completion notification for user compaction and tablet 
has no selected files {} {}",
-            ecid, extent);
-        return false;
-      }
-
-      if (ecm.getFateTxId() != 
tabletMetadata.getSelectedFiles().getFateTxId()) {
-        // maybe the compaction was cancled and another user compaction was 
started on the tablet.
-        LOG.debug(
-            "Received completion notification for user compaction where its 
fate txid did not match the tablets {} {} {} {}",
-            ecid, extent, FateTxId.formatTid(ecm.getFateTxId()),
-            
FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId()));
-      }
-
-      if 
(!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) {
-        // this is not expected to happen
-        LOG.error("User compaction contained files not in the selected set {} 
{} {} {} {}",
-            tabletMetadata.getExtent(), ecid, ecm.getKind(),
-            
Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles),
-            ecm.getJobFiles());
-        return false;
-      }
-    }
-
-    if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) {
-      // this is not expected to happen
-      LOG.error("Compaction contained files not in the tablet files set {} {} 
{} {}",
-          tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), 
ecm.getJobFiles());
-      return false;
-    }
-
-    return true;
-  }
-
-  private TabletMetadata commitCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
-      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile,
-      RefreshWriter refreshWriter) {
-
-    KeyExtent extent = tablet.getExtent();
-
-    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
-        .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5)
-        .logInterval(3, MINUTES).createRetry();
-
-    while (canCommitCompaction(ecid, tablet)) {
-      CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
-
-      // the compacted files should not exists in the tablet already
-      var tablet2 = tablet;
-      newDatafile.ifPresent(
-          newFile -> 
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
-              "File already exists in tablet %s %s", newFile, 
tablet2.getFiles()));
-
-      if (tablet.getLocation() != null
-          && tablet.getExternalCompactions().get(ecid).getKind() != 
CompactionKind.USER) {
-        // Write the refresh entry before attempting to update tablet 
metadata, this ensures that
-        // refresh will happen even if this process dies. In the case where 
this process does not
-        // die refresh will happen after commit. User compactions will make 
refresh calls in their
-        // fate operation, so it does not need to be done here.
-        refreshWriter.writeRefresh(tablet.getLocation());
-      }
-
-      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
-        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
-            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
-
-        if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
-          tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
-        }
-
-        // make the needed updates to the tablet
-        updateTabletForCompaction(stats, ecid, tablet, newDatafile, extent, 
ecm, tabletMutator);
-
-        tabletMutator
-            .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
-
-        // TODO expensive logging
-        LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
-            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
-                .collect(Collectors.toList()));
-
-        // ELASTICITY_TODO check return value and retry, could fail because of 
race conditions
-        var result = tabletsMutator.process().get(extent);
-        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
-          // compaction was committed, mark the compaction input files for 
deletion
-          //
-          // ELASTICITIY_TODO in the case of process death the GC candidates 
would never be added
-          // like #3811. If compaction commit were moved to FATE per #3559 
then this would not
-          // be an issue. If compaction commit is never moved to FATE, then 
this addition could
-          // moved to the compaction refresh process. The compaction refresh 
process will go away
-          // if compaction commit is moved to FATE, so should only do this if 
not moving to FATE.
-          ctx.getAmple().putGcCandidates(extent.tableId(), ecm.getJobFiles());
-          break;
-        } else {
-          // compaction failed to commit, maybe something changed on the 
tablet so lets reread the
-          // metadata and try again
-          tablet = result.readMetadata();
-        }
-
-        retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for 
tablet " + extent);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    return tablet;
-  }
-
-  private void updateTabletForCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
-      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, 
KeyExtent extent,
-      CompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) {
-    // ELASTICITY_TODO improve logging adapt to use existing tablet files 
logging
-    if (ecm.getKind() == CompactionKind.USER) {
-      if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
-        // all files selected for the user compactions are finished, so the 
tablet is finish and
-        // its compaction id needs to be updated.
-
-        long fateTxId = tablet.getSelectedFiles().getFateTxId();
-
-        Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId),
-            "Tablet %s unexpected has selected files and compacted columns for 
%s",
-            tablet.getExtent(), fateTxId);
-
-        // TODO set to trace
-        LOG.debug("All selected files compcated for {} setting compacted for 
{}",
-            tablet.getExtent(), 
FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId()));
-
-        tabletMutator.deleteSelectedFiles();
-        tabletMutator.putCompacted(fateTxId);
-
-      } else {
-        // not all of the selected files were finished, so need to add the new 
file to the
-        // selected set
-
-        Set<StoredTabletFile> newSelectedFileSet =
-            new HashSet<>(tablet.getSelectedFiles().getFiles());
-        newSelectedFileSet.removeAll(ecm.getJobFiles());
-
-        if (newDatafile.isPresent()) {
-          // TODO set to trace
-          LOG.debug(
-              "Not all selected files for {} are done, adding new selected 
file {} from compaction",
-              tablet.getExtent(), 
newDatafile.orElseThrow().getPath().getName());
-          newSelectedFileSet.add(newDatafile.orElseThrow().insert());
-        } else {
-          // TODO set to trace
-          LOG.debug(
-              "Not all selected files for {} are done, compaction produced no 
output so not adding to selected set.",
-              tablet.getExtent());
-        }
-
-        tabletMutator.putSelectedFiles(
-            new SelectedFiles(newSelectedFileSet, 
tablet.getSelectedFiles().initiallySelectedAll(),
-                tablet.getSelectedFiles().getFateTxId()));
-      }
-    }
-
-    if (tablet.getLocation() != null) {
-      // add scan entries to prevent GC in case the hosted tablet is currently 
using the files for
-      // scan
-      ecm.getJobFiles().forEach(tabletMutator::putScan);
-    }
-    ecm.getJobFiles().forEach(tabletMutator::deleteFile);
-    tabletMutator.deleteExternalCompaction(ecid);
-
-    if (newDatafile.isPresent()) {
-      tabletMutator.putFile(newDatafile.orElseThrow(),
-          new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
-    }
+    // ELASTICITY_TODO should above call move into fate code?
   }
 
   @Override
@@ -1330,11 +945,6 @@ public class CompactionCoordinator
     cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
   }
 
-  /* Method exists to be overridden in test to hide static method */
-  protected String getTServerAddressString(HostAndPort tserverAddress) {
-    return ExternalCompactionUtil.getHostPortString(tserverAddress);
-  }
-
   /* Method exists to be overridden in test to hide static method */
   protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
     return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
@@ -1346,11 +956,6 @@ public class CompactionCoordinator
     ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, 
externalCompactionId);
   }
 
-  /* Method exists to be overridden in test to hide static method */
-  protected void returnTServerClient(TabletServerClientService.Client client) {
-    ThriftUtil.returnClient(client, this.ctx);
-  }
-
   private void deleteEmpty(ZooReaderWriter zoorw, String path)
       throws KeeperException, InterruptedException {
     try {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
new file mode 100644
index 0000000000..6063a3b38f
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.coordinator.commit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.AbstractTabletFile;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.SelectedFiles;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
+import org.apache.accumulo.core.util.Retry;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CommitCompaction extends ManagerRepo {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(CommitCompaction.class);
+  private final CompactionCommitData commitData;
+  private final String newDatafile;
+
+  public CommitCompaction(CompactionCommitData commitData, String newDatafile) 
{
+    this.commitData = commitData;
+    this.newDatafile = newDatafile;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    var ecid = ExternalCompactionId.of(commitData.ecid);
+    var newFile = Optional.ofNullable(newDatafile).map(f -> 
ReferencedTabletFile.of(new Path(f)));
+
+    // ELASTICITIY_TODO is it possible to test this code running a 2nd time, 
simulating a failure
+    // and rerun? Maybe fate could have a testing mode where it calls 
operations multiple times?
+
+    // It is possible that when this runs that the compaction was previously 
committed and then the
+    // process died and now its running again. In this case commit should do 
nothing, but its
+    // important to still carry on with the rest of the steps after commit. 
This code ignores a that
+    // fact that a commit may not have happened in the current call and 
continues for this reason.
+    TabletMetadata tabletMetadata = commitCompaction(manager.getContext(), 
ecid, newFile);
+
+    String loc = null;
+    if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
+      loc = tabletMetadata.getLocation().getHostPortSession();
+    }
+
+    // This will causes the tablet to be reexamined to see if it needs any 
more compactions.
+    var extent = KeyExtent.fromThrift(commitData.textent);
+    manager.getEventCoordinator().event(extent, "Compaction completed %s", 
extent);
+
+    return new PutGcCandidates(commitData, loc);
+  }
+
+  KeyExtent getExtent() {
+    return KeyExtent.fromThrift(commitData.textent);
+  }
+
+  private TabletMetadata commitCompaction(ServerContext ctx, 
ExternalCompactionId ecid,
+      Optional<ReferencedTabletFile> newDatafile) {
+
+    var tablet =
+        ctx.getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION, 
FILES, COMPACTED, OPID);
+
+    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
+        .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5)
+        .logInterval(3, MINUTES).createRetry();
+
+    while (canCommitCompaction(ecid, tablet)) {
+      CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
+
+      // the compacted files should not exists in the tablet already
+      var tablet2 = tablet;
+      newDatafile.ifPresent(
+          newFile -> 
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
+              "File already exists in tablet %s %s", newFile, 
tablet2.getFiles()));
+
+      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
+        var tabletMutator = 
tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
+            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
+
+        if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
+          tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
+        }
+
+        // make the needed updates to the tablet
+        updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, 
ecm, tabletMutator);
+
+        tabletMutator
+            .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+
+        // TODO expensive logging
+        LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
+            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
+                .collect(Collectors.toList()));
+
+        var result = tabletsMutator.process().get(getExtent());
+        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+          break;
+        } else {
+          // compaction failed to commit, maybe something changed on the 
tablet so lets reread the
+          // metadata and try again
+          tablet = result.readMetadata();
+        }
+
+        retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for 
tablet " + getExtent());
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return tablet;
+  }
+
+  private void updateTabletForCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
+      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, 
CompactionMetadata ecm,
+      Ample.ConditionalTabletMutator tabletMutator) {
+    // ELASTICITY_TODO improve logging adapt to use existing tablet files 
logging
+    if (ecm.getKind() == CompactionKind.USER) {
+      if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
+        // all files selected for the user compactions are finished, so the 
tablet is finish and
+        // its compaction id needs to be updated.
+
+        long fateTxId = tablet.getSelectedFiles().getFateTxId();
+
+        Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId),
+            "Tablet %s unexpected has selected files and compacted columns for 
%s",
+            tablet.getExtent(), fateTxId);
+
+        // TODO set to trace
+        LOG.debug("All selected files compcated for {} setting compacted for 
{}",
+            tablet.getExtent(), 
FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId()));
+
+        tabletMutator.deleteSelectedFiles();
+        tabletMutator.putCompacted(fateTxId);
+
+      } else {
+        // not all of the selected files were finished, so need to add the new 
file to the
+        // selected set
+
+        Set<StoredTabletFile> newSelectedFileSet =
+            new HashSet<>(tablet.getSelectedFiles().getFiles());
+        newSelectedFileSet.removeAll(ecm.getJobFiles());
+
+        if (newDatafile.isPresent()) {
+          // TODO set to trace
+          LOG.debug(
+              "Not all selected files for {} are done, adding new selected 
file {} from compaction",
+              tablet.getExtent(), 
newDatafile.orElseThrow().getPath().getName());
+          newSelectedFileSet.add(newDatafile.orElseThrow().insert());
+        } else {
+          // TODO set to trace
+          LOG.debug(
+              "Not all selected files for {} are done, compaction produced no 
output so not adding to selected set.",
+              tablet.getExtent());
+        }
+
+        tabletMutator.putSelectedFiles(
+            new SelectedFiles(newSelectedFileSet, 
tablet.getSelectedFiles().initiallySelectedAll(),
+                tablet.getSelectedFiles().getFateTxId()));
+      }
+    }
+
+    if (tablet.getLocation() != null) {
+      // add scan entries to prevent GC in case the hosted tablet is currently 
using the files for
+      // scan
+      ecm.getJobFiles().forEach(tabletMutator::putScan);
+    }
+    ecm.getJobFiles().forEach(tabletMutator::deleteFile);
+    tabletMutator.deleteExternalCompaction(ecid);
+
+    if (newDatafile.isPresent()) {
+      tabletMutator.putFile(newDatafile.orElseThrow(),
+          new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
+    }
+  }
+
+  // ELASTICITY_TODO unit test this method
+  public static boolean canCommitCompaction(ExternalCompactionId ecid,
+      TabletMetadata tabletMetadata) {
+
+    if (tabletMetadata == null) {
+      LOG.debug("Received completion notification for nonexistent tablet {}", 
ecid);
+      return false;
+    }
+
+    var extent = tabletMetadata.getExtent();
+
+    if (tabletMetadata.getOperationId() != null) {
+      // split, merge, and delete tablet should delete the compaction entry in 
the tablet
+      LOG.debug("Received completion notification for tablet with active 
operation {} {} {}", ecid,
+          extent, tabletMetadata.getOperationId());
+      return false;
+    }
+
+    CompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid);
+
+    if (ecm == null) {
+      LOG.debug("Received completion notification for unknown compaction {} 
{}", ecid, extent);
+      return false;
+    }
+
+    if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == 
CompactionKind.SELECTOR) {
+      if (tabletMetadata.getSelectedFiles() == null) {
+        // when the compaction is canceled, selected files are deleted
+        LOG.debug(
+            "Received completion notification for user compaction and tablet 
has no selected files {} {}",
+            ecid, extent);
+        return false;
+      }
+
+      if (ecm.getFateTxId() != 
tabletMetadata.getSelectedFiles().getFateTxId()) {
+        // maybe the compaction was cancled and another user compaction was 
started on the tablet.
+        LOG.debug(
+            "Received completion notification for user compaction where its 
fate txid did not match the tablets {} {} {} {}",
+            ecid, extent, FateTxId.formatTid(ecm.getFateTxId()),
+            
FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId()));
+      }
+
+      if 
(!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) {
+        // this is not expected to happen
+        LOG.error("User compaction contained files not in the selected set {} 
{} {} {} {}",
+            tabletMetadata.getExtent(), ecid, ecm.getKind(),
+            
Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles),
+            ecm.getJobFiles());
+        return false;
+      }
+    }
+
+    if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) {
+      // this is not expected to happen
+      LOG.error("Compaction contained files not in the tablet files set {} {} 
{} {}",
+          tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), 
ecm.getJobFiles());
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
new file mode 100644
index 0000000000..23b293c25e
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.coordinator.commit;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
+
+public class CompactionCommitData implements Serializable {
+  private static final long serialVersionUID = 1L;
+  final CompactionKind kind;
+  final Set<String> inputPaths;
+  final String outputTmpPath;
+  final String ecid;
+  final TKeyExtent textent;
+  final TCompactionStats stats;
+
+  public CompactionCommitData(ExternalCompactionId ecid, KeyExtent extent, 
CompactionMetadata ecm,
+      TCompactionStats stats) {
+    this.ecid = ecid.canonical();
+    this.textent = extent.toThrift();
+    this.kind = ecm.getKind();
+    this.inputPaths =
+        
ecm.getJobFiles().stream().map(StoredTabletFile::getMetadata).collect(Collectors.toSet());
+    this.outputTmpPath = ecm.getCompactTmpName().getNormalizedPathStr();
+    this.stats = stats;
+  }
+
+  public TableId getTableId() {
+    return KeyExtent.fromThrift(textent).tableId();
+  }
+
+  public Collection<StoredTabletFile> getJobFiles() {
+    return 
inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList());
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
new file mode 100644
index 0000000000..6ce5e37ea7
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.coordinator.commit;
+
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+
+public class PutGcCandidates extends ManagerRepo {
+  private static final long serialVersionUID = 1L;
+  private final CompactionCommitData commitData;
+  private final String refreshLocation;
+
+  public PutGcCandidates(CompactionCommitData commitData, String 
refreshLocation) {
+    this.commitData = commitData;
+    this.refreshLocation = refreshLocation;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+
+    // add the GC candidates
+    manager.getContext().getAmple().putGcCandidates(commitData.getTableId(),
+        commitData.getJobFiles());
+
+    if (commitData.kind == CompactionKind.USER || refreshLocation == null) {
+      // user compactions will refresh tablets as part of the FATE operation 
driving the user
+      // compaction, so no need to do it here
+      return null;
+    }
+
+    return new RefreshTablet(commitData.textent, refreshLocation);
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
new file mode 100644
index 0000000000..d2044990a0
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.coordinator.commit;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class RefreshTablet extends ManagerRepo {
+  private static final long serialVersionUID = 1L;
+  private final TKeyExtent extent;
+  private final String tserverInstance;
+
+  public RefreshTablet(TKeyExtent extent, String tserverInstance) {
+    this.extent = extent;
+    this.tserverInstance = tserverInstance;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+
+    TServerInstance tsi = new TServerInstance(tserverInstance);
+
+    // there is a single tserver and single tablet, do not need a thread pool. 
The direct executor
+    // will run everything in the current thread
+    ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+    try {
+      TabletRefresher.refreshTablets(executorService, "compaction:" + 
KeyExtent.fromThrift(extent),
+          manager.getContext(), manager::onlineTabletServers,
+          Map.of(TabletMetadata.Location.current(tsi), List.of(extent)));
+    } finally {
+      executorService.shutdownNow();
+    }
+
+    return null;
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
new file mode 100644
index 0000000000..2261ae7bf3
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.coordinator.commit;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RenameCompactionFile extends ManagerRepo {
+  private static final Logger log = 
LoggerFactory.getLogger(RenameCompactionFile.class);
+  private static final long serialVersionUID = 1L;
+  private final CompactionCommitData commitData;
+
+  public RenameCompactionFile(CompactionCommitData commitData) {
+    this.commitData = commitData;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    ReferencedTabletFile newDatafile = null;
+    var ctx = manager.getContext();
+
+    var tmpPath = new Path(commitData.outputTmpPath);
+
+    if (commitData.stats.getEntriesWritten() == 0) {
+      // the compaction produced no output so do not need to rename or add a 
file to the metadata
+      // table, only delete the input files.
+      try {
+        if (!ctx.getVolumeManager().delete(tmpPath)) {
+          throw new IOException("delete returned false for " + tmpPath);
+        }
+      } catch (IOException ioe) {
+        // Log something in case there is an exception while doing the check, 
will have the original
+        // exception.
+        log.debug("Attempting to see if file exists after delete failure of 
{}", tmpPath, ioe);
+        if (!ctx.getVolumeManager().exists(tmpPath)) {
+          log.debug(
+              "Failed to delete file {}, but it does not exists.  Assuming 
this is a 2nd run.",
+              tmpPath, ioe);
+        } else {
+          throw ioe;
+        }
+      }
+    } else {
+      newDatafile = 
TabletNameGenerator.computeCompactionFileDest(ReferencedTabletFile.of(tmpPath));
+      try {
+        if (!ctx.getVolumeManager().rename(tmpPath, newDatafile.getPath())) {
+          throw new IOException("rename returned false for " + tmpPath);
+        }
+      } catch (IOException ioe) {
+        // Log something in case there is an exception while doing the check, 
will have the original
+        // exception.
+        log.debug("Attempting to see if file exists after rename failure of {} 
to {}", tmpPath,
+            newDatafile, ioe);
+        if (ctx.getVolumeManager().exists(newDatafile.getPath())
+            && !ctx.getVolumeManager().exists(tmpPath)) {
+          log.debug(
+              "Failed to rename {} to {}, but destination exists and source 
does not.  Assuming this is a 2nd run.",
+              tmpPath, newDatafile, ioe);
+        } else {
+          throw ioe;
+        }
+      }
+    }
+
+    return new CommitCompaction(commitData,
+        newDatafile == null ? null : newDatafile.getNormalizedPathStr());
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java 
b/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java
deleted file mode 100644
index 053f01d5a3..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.compaction;
-
-import static java.util.stream.Collectors.toMap;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests reading and writing refesh entries. This is done as a an IT instead 
of a unit test because
- * it would take too much code to mock ZK and the metadata table.
- */
-public class RefreshesIT extends ConfigurableMacBase {
-  private void testRefreshes(Ample.DataLevel level, KeyExtent extent1, 
KeyExtent extent2) {
-    var refreshes = getServerContext().getAmple().refreshes(level);
-
-    assertEquals(0, refreshes.stream().count());
-
-    var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
-    var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
-    var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
-
-    var tserver1 = new TServerInstance("host1:9997[abcdef123]");
-    var tserver2 = new TServerInstance("host2:9997[1234567890]");
-
-    refreshes.add(List.of(new RefreshEntry(ecid1, extent1, tserver1)));
-
-    assertEquals(Map.of(ecid1, extent1),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getExtent)));
-    assertEquals(Map.of(ecid1, tserver1),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getTserver)));
-
-    refreshes.add(List.of(new RefreshEntry(ecid2, extent2, tserver1),
-        new RefreshEntry(ecid3, extent2, tserver2)));
-
-    assertEquals(Map.of(ecid1, extent1, ecid2, extent2, ecid3, extent2),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getExtent)));
-    assertEquals(Map.of(ecid1, tserver1, ecid2, tserver1, ecid3, tserver2),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getTserver)));
-
-    refreshes.delete(List.of(new RefreshEntry(ecid2, extent2, tserver1)));
-
-    assertEquals(Map.of(ecid1, extent1, ecid3, extent2),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getExtent)));
-    assertEquals(Map.of(ecid1, tserver1, ecid3, tserver2),
-        refreshes.stream().collect(toMap(RefreshEntry::getEcid, 
RefreshEntry::getTserver)));
-
-    refreshes.delete(List.of(new RefreshEntry(ecid3, extent2, tserver2),
-        new RefreshEntry(ecid1, extent1, tserver1)));
-
-    assertEquals(0, refreshes.stream().count());
-  }
-
-  @Test
-  public void testRefreshStorage() {
-    var extent1 = new KeyExtent(TableId.of("1"), null, null);
-    var extent2 = new KeyExtent(TableId.of("2"), new Text("m"), new Text("c"));
-
-    // the root level only expects the root tablet extent
-    assertThrows(IllegalArgumentException.class,
-        () -> testRefreshes(Ample.DataLevel.ROOT, extent1, extent2));
-    testRefreshes(Ample.DataLevel.ROOT, RootTable.EXTENT, RootTable.EXTENT);
-    testRefreshes(Ample.DataLevel.METADATA, extent1, extent2);
-    testRefreshes(Ample.DataLevel.USER, extent1, extent2);
-  }
-}

Reply via email to