This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 989011f6f0 moved merge code out of TGW and into Fate (#3854)
989011f6f0 is described below

commit 989011f6f0b8752af7850363fc4254764d8248f5
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Oct 17 16:52:46 2023 -0400

    moved merge code out of TGW and into Fate (#3854)
    
    Most of the code in the new DeleteRows and MergeTablets classes was
    copied from TabletGroupWatcher with slight modifications (like adding
    the Manager as parameter).
---
 .../accumulo/server/manager/state/MergeState.java  |   4 -
 .../server/manager/state/MergeInfoTest.java        |   4 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   4 +-
 .../accumulo/manager/TabletGroupWatcher.java       | 753 +--------------------
 .../apache/accumulo/manager/state/MergeStats.java  | 238 -------
 .../manager/tableOps/merge/DeleteRows.java         | 600 ++++++++++++++++
 ...bleRangeOpWait.java => FinishTableRangeOp.java} |  17 +-
 .../manager/tableOps/merge/MergeTablets.java       | 282 ++++++++
 .../manager/tableOps/merge/TableRangeOp.java       |   6 +-
 .../manager/tableOps/merge/WaitForOffline.java     |  81 +++
 .../merge/DeleteRowsTest.java}                     |  10 +-
 .../apache/accumulo/test/manager/MergeStateIT.java | 247 -------
 12 files changed, 984 insertions(+), 1262 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
index fc15a0b39c..9bf6616bdf 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
@@ -23,10 +23,6 @@ public enum MergeState {
    * Not merging
    */
   NONE,
-  /**
-   * created, stored in zookeeper, other merges are prevented on the table
-   */
-  STARTED,
   /**
    * when the number of chopped tablets in the range matches the number of 
online tablets in the
    * range, take the tablets offline
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
index 01ae58bf24..2d7a3a7927 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
@@ -76,7 +76,7 @@ public class MergeInfoTest {
     Text prevEndRow = new Text("begin");
     keyExtent = new KeyExtent(TableId.of(table), endRow, prevEndRow);
     mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE);
-    mi.setState(MergeState.STARTED);
+    mi.setState(MergeState.WAITING_FOR_OFFLINE);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(baos);
     mi.write(dos);
@@ -84,7 +84,7 @@ public class MergeInfoTest {
     DataInputStream dis = new DataInputStream(bais);
     mi = new MergeInfo();
     mi.readFields(dis);
-    assertSame(MergeState.STARTED, mi.getState());
+    assertSame(MergeState.WAITING_FOR_OFFLINE, mi.getState());
     assertEquals(keyExtent, mi.getExtent());
     assertSame(MergeInfo.Operation.DELETE, mi.getOperation());
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 21183349f5..f10dbd25c1 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -539,7 +539,7 @@ public class Manager extends AbstractServer
           throw new AssertionError("Unlikely", ex);
         }
         context.getZooReaderWriter().putPersistentData(path, out.getData(),
-            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
+            state.equals(MergeState.WAITING_FOR_OFFLINE) ? 
ZooUtil.NodeExistsPolicy.FAIL
                 : ZooUtil.NodeExistsPolicy.OVERWRITE);
       }
       mergeLock.notifyAll();
@@ -711,8 +711,6 @@ public class Manager extends AbstractServer
             case NONE:
             case COMPLETE:
               break;
-            case STARTED:
-              return TabletGoalState.HOSTED;
             case WAITING_FOR_OFFLINE:
             case MERGING:
               return TabletGoalState.UNASSIGNED;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 31c15071a4..29fe40cdce 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -26,44 +26,29 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
 
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
-import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
 import org.apache.accumulo.core.manager.state.TabletManagement;
@@ -74,41 +59,25 @@ import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
-import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
 import org.apache.accumulo.manager.Manager.TabletGoalState;
 import org.apache.accumulo.manager.split.SplitTask;
-import org.apache.accumulo.manager.state.MergeStats;
 import org.apache.accumulo.manager.state.TableCounts;
 import org.apache.accumulo.manager.state.TableStats;
-import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.gc.AllVolumesDirectory;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@ -116,23 +85,17 @@ import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.ClosableIterator;
 import org.apache.accumulo.server.manager.state.DistributedStoreException;
 import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
 import org.apache.accumulo.server.manager.state.UnassignedTablet;
-import org.apache.accumulo.server.tablets.TabletTime;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
 
 abstract class TabletGroupWatcher extends AccumuloDaemonThread {
 
@@ -327,8 +290,6 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   private static class TableMgmtStats {
     int[] counts = new int[TabletState.values().length];
     private int totalUnloaded;
-
-    Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
   }
 
   private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
@@ -339,10 +300,10 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     TableMgmtStats tableMgmtStats = new TableMgmtStats();
     int unloaded = 0;
 
-    Map<TableId,MergeStats> currentMerges = new HashMap<>();
+    Map<TableId,MergeInfo> currentMerges = new HashMap<>();
     for (MergeInfo merge : manager.merges()) {
       if (merge.getExtent() != null) {
-        currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge));
+        currentMerges.put(merge.getExtent().tableId(), merge);
       }
     }
 
@@ -391,11 +352,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
       final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 
-      final MergeStats mergeStats = 
tableMgmtStats.mergeStatsCache.computeIfAbsent(tableId, k -> {
-        var mStats = currentMerges.get(k);
-        return mStats != null ? mStats : new MergeStats(new MergeInfo());
-      });
-      TabletGoalState goal = manager.getGoalState(tm, 
mergeStats.getMergeInfo());
+      TabletGoalState goal = manager.getGoalState(tm,
+          currentMerges.computeIfAbsent(tm.getTableId(), k -> new 
MergeInfo()));
       TabletState state =
           TabletState.compute(tm, currentTServers.keySet(), 
manager.tabletBalancer, resourceGroups);
 
@@ -414,7 +372,6 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       if (isFullScan) {
         stats.update(tableId, state);
       }
-      mergeStats.update(tm.getExtent(), state);
 
       // Always follow through with assignments
       if (state == TabletState.ASSIGNED) {
@@ -633,8 +590,6 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
               tabletMgmtStats.totalUnloaded);
         }
 
-        updateMergeState(tabletMgmtStats.mergeStatsCache);
-
         synchronized (this) {
           lastScanServers = 
ImmutableSortedSet.copyOf(currentTServers.keySet());
         }
@@ -809,706 +764,6 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     return result;
   }
 
-  private void updateMergeState(Map<TableId,MergeStats> mergeStatsCache) {
-    for (MergeStats stats : mergeStatsCache.values()) {
-      try {
-        MergeState update = stats.nextMergeState(manager.getContext(), 
manager);
-        // when next state is MERGING, its important to persist this before
-        // starting the merge... the verification check that is done before
-        // moving into the merging state could fail if merge starts but does
-        // not finish
-        if (update == MergeState.COMPLETE) {
-          update = MergeState.NONE;
-        }
-        if (update != stats.getMergeInfo().getState()) {
-          manager.setMergeState(stats.getMergeInfo(), update);
-        }
-
-        if (update == MergeState.MERGING) {
-          try {
-            if (stats.getMergeInfo().isDelete()) {
-              deleteTablets(stats.getMergeInfo());
-            } else {
-              mergeMetadataRecords(stats.getMergeInfo());
-            }
-            update = MergeState.COMPLETE;
-            manager.setMergeState(stats.getMergeInfo(), update);
-          } catch (Exception ex) {
-            Manager.log.error("Unable merge metadata table records", ex);
-          }
-        }
-      } catch (Exception ex) {
-        Manager.log.error(
-            "Unable to update merge state for merge " + 
stats.getMergeInfo().getExtent(), ex);
-      }
-    }
-  }
-
-  // This method finds returns the deletion starting row (exclusive) for 
tablets that
-  // need to be actually deleted. If the startTablet is null then
-  // the deletion start row will just be null as all tablets are being deleted
-  // up to the end. Otherwise, this returns the endRow of the first tablet
-  // as the first tablet should be kept and will have been previously
-  // fenced if necessary
-  private Text getDeletionStartRow(final KeyExtent startTablet) {
-    if (startTablet == null) {
-      Manager.log.debug("First tablet for delete range is null");
-      return null;
-    }
-
-    final Text deletionStartRow = startTablet.endRow();
-    Manager.log.debug("Start row is {} for deletion", deletionStartRow);
-
-    return deletionStartRow;
-  }
-
-  // This method finds returns the deletion ending row (inclusive) for tablets 
that
-  // need to be actually deleted. If the endTablet is null then
-  // the deletion end row will just be null as all tablets are being deleted
-  // after the start row. Otherwise, this returns the prevEndRow of the last 
tablet
-  // as the last tablet should be kept and will have been previously
-  // fenced if necessary
-  private Text getDeletionEndRow(final KeyExtent endTablet) {
-    if (endTablet == null) {
-      Manager.log.debug("Last tablet for delete range is null");
-      return null;
-    }
-
-    Text deletionEndRow = endTablet.prevEndRow();
-    Manager.log.debug("Deletion end row is {}", deletionEndRow);
-
-    return deletionEndRow;
-  }
-
-  private static boolean isFirstTabletInTable(KeyExtent tablet) {
-    return tablet != null && tablet.prevEndRow() == null;
-  }
-
-  private static boolean isLastTabletInTable(KeyExtent tablet) {
-    return tablet != null && tablet.endRow() == null;
-  }
-
-  private static boolean areContiguousTablets(KeyExtent firstTablet, KeyExtent 
lastTablet) {
-    return firstTablet != null && lastTablet != null
-        && Objects.equals(firstTablet.endRow(), lastTablet.prevEndRow());
-  }
-
-  private boolean hasTabletsToDelete(final KeyExtent firstTabletInRange,
-      final KeyExtent lastTableInRange) {
-    // If the tablets are equal (and not null) then the deletion range is just 
part of 1 tablet
-    // which will be fenced so there are no tablets to delete. The null check 
is because if both
-    // are null then we are just deleting everything, so we do have tablets to 
delete
-    if (Objects.equals(firstTabletInRange, lastTableInRange) && 
firstTabletInRange != null) {
-      Manager.log.trace(
-          "No tablets to delete, firstTablet {} equals lastTablet {} in 
deletion range and was fenced.",
-          firstTabletInRange, lastTableInRange);
-      return false;
-      // If the lastTablet of the deletion range is the first tablet of the 
table it has been fenced
-      // already so nothing to actually delete before it
-    } else if (isFirstTabletInTable(lastTableInRange)) {
-      Manager.log.trace(
-          "No tablets to delete, lastTablet {} in deletion range is the first 
tablet of the table and was fenced.",
-          lastTableInRange);
-      return false;
-      // If the firstTablet of the deletion range is the last tablet of the 
table it has been fenced
-      // already so nothing to actually delete after it
-    } else if (isLastTabletInTable(firstTabletInRange)) {
-      Manager.log.trace(
-          "No tablets to delete, firstTablet {} in deletion range is the last 
tablet of the table and was fenced.",
-          firstTabletInRange);
-      return false;
-      // If the firstTablet and lastTablet are contiguous tablets then there 
is nothing to delete as
-      // each will be fenced and nothing between
-    } else if (areContiguousTablets(firstTabletInRange, lastTableInRange)) {
-      Manager.log.trace(
-          "No tablets to delete, firstTablet {} and lastTablet {} in deletion 
range are contiguous and were fenced.",
-          firstTabletInRange, lastTableInRange);
-      return false;
-    }
-
-    return true;
-  }
-
-  private void deleteTablets(MergeInfo info) throws AccumuloException {
-    // Before updated metadata and get the first and last tablets which
-    // are fenced if necessary
-    final Pair<KeyExtent,KeyExtent> firstAndLastTablets = 
updateMetadataRecordsForDelete(info);
-
-    // Find the deletion start row (exclusive) for tablets that need to be 
actually deleted
-    // This will be null if deleting everything up until the end row or it 
will be
-    // the endRow of the first tablet as the first tablet should be kept and 
will have
-    // already been fenced if necessary
-    final Text deletionStartRow = 
getDeletionStartRow(firstAndLastTablets.getFirst());
-
-    // Find the deletion end row (inclusive) for tablets that need to be 
actually deleted
-    // This will be null if deleting everything after the starting row or it 
will be
-    // the prevEndRow of the last tablet as the last tablet should be kept and 
will have
-    // already been fenced if necessary
-    Text deletionEndRow = getDeletionEndRow(firstAndLastTablets.getSecond());
-
-    // check if there are any tablets to delete and if not return
-    if (!hasTabletsToDelete(firstAndLastTablets.getFirst(), 
firstAndLastTablets.getSecond())) {
-      Manager.log.trace("No tablets to delete for range {}, returning", 
info.getExtent());
-      return;
-    }
-
-    // Build an extent for the actual deletion range
-    final KeyExtent extent =
-        new KeyExtent(info.getExtent().tableId(), deletionEndRow, 
deletionStartRow);
-    Manager.log.debug("Tablet deletion range is {}", extent);
-    String targetSystemTable = extent.isMeta() ? RootTable.NAME : 
MetadataTable.NAME;
-    Manager.log.debug("Deleting tablets for {}", extent);
-    MetadataTime metadataTime = null;
-    KeyExtent followingTablet = null;
-    Set<TabletHostingGoal> goals = new HashSet<>();
-    if (extent.endRow() != null) {
-      Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW);
-      followingTablet =
-          getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), 
extent.endRow()));
-      Manager.log.debug("Found following tablet {}", followingTablet);
-    }
-    try {
-      AccumuloClient client = manager.getContext();
-      ServerContext context = manager.getContext();
-      Ample ample = context.getAmple();
-      Text start = extent.prevEndRow();
-      if (start == null) {
-        start = new Text();
-      }
-      Manager.log.debug("Making file deletion entries for {}", extent);
-      Range deleteRange = new Range(TabletsSection.encodeRow(extent.tableId(), 
start), false,
-          TabletsSection.encodeRow(extent.tableId(), extent.endRow()), true);
-      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
-      scanner.setRange(deleteRange);
-      ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
-      Set<ReferenceFile> datafilesAndDirs = new TreeSet<>();
-      for (Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
-          var stf = new 
StoredTabletFile(key.getColumnQualifierData().toString());
-          datafilesAndDirs.add(new ReferenceFile(stf.getTableId(), stf));
-          if (datafilesAndDirs.size() > 1000) {
-            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
-            datafilesAndDirs.clear();
-          }
-        } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          metadataTime = MetadataTime.parse(entry.getValue().toString());
-        } else if (key.compareColumnFamily(CurrentLocationColumnFamily.NAME) 
== 0) {
-          throw new IllegalStateException(
-              "Tablet " + key.getRow() + " is assigned during a merge!");
-        } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
-          var allVolumesDirectory =
-              new AllVolumesDirectory(extent.tableId(), 
entry.getValue().toString());
-          datafilesAndDirs.add(allVolumesDirectory);
-          if (datafilesAndDirs.size() > 1000) {
-            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
-            datafilesAndDirs.clear();
-          }
-        } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) {
-          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
-          goals.add(thisGoal);
-        }
-      }
-      ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs);
-      BatchWriter bw = client.createBatchWriter(targetSystemTable);
-      try {
-        deleteTablets(info, deleteRange, bw, client);
-      } finally {
-        bw.close();
-      }
-
-      if (followingTablet != null) {
-        Manager.log.debug("Updating prevRow of {} to {}", followingTablet, 
extent.prevEndRow());
-        bw = client.createBatchWriter(targetSystemTable);
-        try {
-          Mutation m = new Mutation(followingTablet.toMetaRow());
-          TabletColumnFamily.PREV_ROW_COLUMN.put(m,
-              TabletColumnFamily.encodePrevEndRow(extent.prevEndRow()));
-          bw.addMutation(m);
-          bw.flush();
-        } finally {
-          bw.close();
-        }
-      } else {
-        // Recreate the default tablet to hold the end of the table
-        MetadataTableUtil.addTablet(new KeyExtent(extent.tableId(), null, 
extent.prevEndRow()),
-            ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, manager.getContext(),
-            metadataTime.getType(), manager.managerLock, 
getMergeHostingGoal(extent, goals));
-      }
-    } catch (RuntimeException | TableNotFoundException ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-
-  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
-    KeyExtent range = info.getExtent();
-    Manager.log.debug("Merging metadata for {}", range);
-    KeyExtent stop = getHighTablet(range);
-    Manager.log.debug("Highest tablet is {}", stop);
-    Value firstPrevRowValue = null;
-    Text stopRow = stop.toMetaRow();
-    Text start = range.prevEndRow();
-    if (start == null) {
-      start = new Text();
-    }
-    Range scanRange =
-        new Range(TabletsSection.encodeRow(range.tableId(), start), false, 
stopRow, false);
-    String targetSystemTable = MetadataTable.NAME;
-    if (range.isMeta()) {
-      targetSystemTable = RootTable.NAME;
-    }
-    Set<TabletHostingGoal> goals = new HashSet<>();
-
-    AccumuloClient client = manager.getContext();
-
-    KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow());
-    KeyExtent previousKeyExtent = null;
-    KeyExtent lastExtent = null;
-
-    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
-      long fileCount = 0;
-      // Make file entries in highest tablet
-      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
-      // Update to set the range to include the highest tablet
-      scanner.setRange(
-          new Range(TabletsSection.encodeRow(range.tableId(), start), false, 
stopRow, true));
-      TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      Mutation m = new Mutation(stopRow);
-      MetadataTime maxLogicalTime = null;
-      for (Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        Value value = entry.getValue();
-
-        final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow());
-
-        // Keep track of the last Key Extent seen so we can use it to fence
-        // of RFiles when merging the metadata
-        if (lastExtent != null && !keyExtent.equals(lastExtent)) {
-          previousKeyExtent = lastExtent;
-        }
-
-        // Special case to handle the highest/stop tablet, which is where 
files are
-        // merged to. The existing merge code won't delete files from this 
tablet
-        // so we need to handle the deletes in this tablet when fencing files.
-        // We may be able to make this simpler in the future.
-        if (keyExtent.equals(stopExtent)) {
-          if (previousKeyExtent != null
-              && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
-
-            // Fence off existing files by the end row of the previous tablet 
and current tablet
-            final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
-            // The end row should be inclusive for the current tablet and the 
previous end row
-            // should be exclusive for the start row
-            Range fenced = new Range(previousKeyExtent.endRow(), false, 
keyExtent.endRow(), true);
-
-            // Clip range if exists
-            fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
-
-            final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
-            // If the existing metadata does not match then we need to delete 
the old
-            // and replace with a new range
-            if (!existing.equals(newFile)) {
-              m.putDelete(DataFileColumnFamily.NAME, 
existing.getMetadataText());
-              m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
-            }
-
-            fileCount++;
-          }
-          // For the highest tablet we only care about the DataFileColumnFamily
-          continue;
-        }
-
-        // Handle metadata updates for all other tablets except the highest 
tablet
-        // Ranges are created for the files and then added to the highest 
tablet in
-        // the merge range. Deletes are handled later for the old files when 
the tablets
-        // are removed.
-        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
-          final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
-
-          // Fence off files by the previous tablet and current tablet that is 
being merged
-          // The end row should be inclusive for the current tablet and the 
previous end row should
-          // be exclusive for the start row.
-          Range fenced = new Range(previousKeyExtent != null ? 
previousKeyExtent.endRow() : null,
-              false, keyExtent.endRow(), true);
-
-          // Clip range with the tablet range if the range already exists
-          fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
-
-          // Move the file and range to the last tablet
-          StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), 
fenced);
-          m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
-
-          fileCount++;
-        } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
-            && firstPrevRowValue == null) {
-          Manager.log.debug("prevRow entry for lowest tablet is {}", value);
-          firstPrevRowValue = new Value(value);
-        } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          maxLogicalTime =
-              TabletTime.maxMetadataTime(maxLogicalTime, 
MetadataTime.parse(value.toString()));
-        } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
-          var allVolumesDir = new AllVolumesDirectory(range.tableId(), 
value.toString());
-          
bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir));
-        } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) {
-          TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(value);
-          goals.add(thisGoal);
-        }
-
-        lastExtent = keyExtent;
-      }
-
-      // read the logical time from the last tablet in the merge range, it is 
not included in
-      // the loop above
-      scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY);
-      scanner.setRange(new Range(stopRow));
-      ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
-      Set<String> extCompIds = new HashSet<>();
-      for (Entry<Key,Value> entry : scanner) {
-        if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
-          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime,
-              MetadataTime.parse(entry.getValue().toString()));
-        } else if 
(ExternalCompactionColumnFamily.NAME.equals(entry.getKey().getColumnFamily())) {
-          extCompIds.add(entry.getKey().getColumnQualifierData().toString());
-        } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(entry.getKey())) 
{
-          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
-          goals.add(thisGoal);
-        }
-      }
-
-      if (maxLogicalTime != null) {
-        ServerColumnFamily.TIME_COLUMN.put(m, new 
Value(maxLogicalTime.encode()));
-      }
-
-      // delete any entries for external compactions
-      extCompIds.forEach(ecid -> 
m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid));
-
-      // Set the TabletHostingGoal for this tablet based on the goals of the 
other tablets in
-      // the merge range. Always takes priority over never.
-      TabletHostingGoal mergeHostingGoal = getMergeHostingGoal(range, goals);
-      HostingColumnFamily.GOAL_COLUMN.put(m, 
TabletHostingGoalUtil.toValue(mergeHostingGoal));
-
-      if (!m.getUpdates().isEmpty()) {
-        bw.addMutation(m);
-      }
-
-      bw.flush();
-
-      Manager.log.debug("Moved {} files to {}", fileCount, stop);
-
-      if (firstPrevRowValue == null) {
-        Manager.log.debug("tablet already merged");
-        return;
-      }
-
-      stop = new KeyExtent(stop.tableId(), stop.endRow(),
-          TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
-      Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop);
-      Manager.log.debug("Setting the prevRow for last tablet: {}", stop);
-      bw.addMutation(updatePrevRow);
-      bw.flush();
-
-      deleteTablets(info, scanRange, bw, client);
-
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-
-  private static TabletHostingGoal getMergeHostingGoal(KeyExtent range,
-      Set<TabletHostingGoal> goals) {
-    TabletHostingGoal mergeHostingGoal = TabletHostingGoal.ONDEMAND;
-    if (range.isMeta() || goals.contains(TabletHostingGoal.ALWAYS)) {
-      mergeHostingGoal = TabletHostingGoal.ALWAYS;
-    } else if (goals.contains(TabletHostingGoal.NEVER)) {
-      mergeHostingGoal = TabletHostingGoal.NEVER;
-    }
-    return mergeHostingGoal;
-  }
-
-  // This method is used to detect if a tablet needs to be split/chopped for a 
delete
-  // Instead of performing a split or chop compaction, the tablet will have 
its files fenced.
-  private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) 
{
-    // Does this extent cover the end points of the delete?
-    final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r);
-    final Predicate<Text> isNotBoundary =
-        r -> !r.equals(keyExtent.endRow()) && 
!r.equals(keyExtent.prevEndRow());
-    final KeyExtent deleteRange = info.getExtent();
-
-    return (keyExtent.overlaps(deleteRange) && Stream
-        .of(deleteRange.prevEndRow(), 
deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary)))
-        || info.needsToBeChopped(keyExtent);
-  }
-
-  // Instead of splitting or chopping tablets for a delete we instead create 
ranges
-  // to exclude the portion of the tablet that should be deleted
-  private Text followingRow(Text row) {
-    if (row == null) {
-      return null;
-    }
-    return new Key(row).followingKey(PartialKey.ROW).getRow();
-  }
-
-  // Instead of splitting or chopping tablets for a delete we instead create 
ranges
-  // to exclude the portion of the tablet that should be deleted
-  private List<Range> createRangesForDeletion(TabletMetadata tabletMetadata,
-      final KeyExtent deleteRange) {
-    final KeyExtent tabletExtent = tabletMetadata.getExtent();
-
-    // If the delete range wholly contains the tablet being deleted then there 
is no range to clip
-    // files to because the files should be completely dropped.
-    Preconditions.checkArgument(!deleteRange.contains(tabletExtent), "delete 
range:%s tablet:%s",
-        deleteRange, tabletExtent);
-
-    final List<Range> ranges = new ArrayList<>();
-
-    if (deleteRange.overlaps(tabletExtent)) {
-      if (deleteRange.prevEndRow() != null
-          && tabletExtent.contains(followingRow(deleteRange.prevEndRow()))) {
-        Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent,
-            tabletExtent.prevEndRow(), deleteRange.prevEndRow());
-        ranges.add(new Range(tabletExtent.prevEndRow(), false, 
deleteRange.prevEndRow(), true));
-      }
-
-      // This covers the case of when a deletion range overlaps the last 
tablet. We need to create a
-      // range that excludes the deletion.
-      if (deleteRange.endRow() != null
-          && tabletMetadata.getExtent().contains(deleteRange.endRow())) {
-        Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, 
deleteRange.endRow(),
-            tabletExtent.endRow());
-        ranges.add(new Range(deleteRange.endRow(), false, 
tabletExtent.endRow(), true));
-      }
-    } else {
-      Manager.log.trace(
-          "Fencing tablet {} files to itself because it does not overlap 
delete range",
-          tabletExtent);
-      ranges.add(tabletExtent.toDataRange());
-    }
-
-    return ranges;
-  }
-
-  private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(MergeInfo 
info)
-      throws AccumuloException {
-    final KeyExtent range = info.getExtent();
-
-    String targetSystemTable = MetadataTable.NAME;
-    if (range.isMeta()) {
-      targetSystemTable = RootTable.NAME;
-    }
-    final Pair<KeyExtent,KeyExtent> startAndEndTablets;
-
-    final AccumuloClient client = manager.getContext();
-
-    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
-      final Text startRow = range.prevEndRow();
-      final Text endRow = range.endRow() != null
-          ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : 
null;
-
-      // Find the tablets that overlap the start and end row of the deletion 
range
-      // If the startRow is null then there will be an empty startTablet we 
don't need
-      // to fence a starting tablet as we are deleting everything up to the 
end tablet
-      // Likewise, if the endRow is null there will be an empty endTablet as 
we are deleting
-      // all tablets after the starting tablet
-      final Optional<TabletMetadata> startTablet = 
Optional.ofNullable(startRow).flatMap(
-          row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, 
ColumnType.FILES));
-      final Optional<TabletMetadata> endTablet = 
Optional.ofNullable(endRow).flatMap(
-          row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, 
ColumnType.FILES));
-
-      // Store the tablets in a Map if present so that if we have the same 
Tablet we
-      // only need to process the same tablet once when fencing
-      final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new 
TreeMap<>();
-      startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft));
-      endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), 
lt));
-
-      // Capture the tablets to return them or null if not loaded
-      startAndEndTablets = new 
Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null),
-          endTablet.map(TabletMetadata::getExtent).orElse(null));
-
-      for (TabletMetadata tabletMetadata : tabletMetadatas.values()) {
-        final KeyExtent keyExtent = tabletMetadata.getExtent();
-
-        // Check if this tablet needs to have its files fenced for the deletion
-        if (needsFencingForDeletion(info, keyExtent)) {
-          Manager.log.debug("Found overlapping keyExtent {} for delete, 
fencing files.", keyExtent);
-
-          // Create the ranges for fencing the files, this takes the place of
-          // chop compactions and splits
-          final List<Range> ranges = createRangesForDeletion(tabletMetadata, 
range);
-          Preconditions.checkState(!ranges.isEmpty(),
-              "No ranges found that overlap deletion range.");
-
-          // Go through and fence each of the files that are part of the tablet
-          for (Entry<StoredTabletFile,DataFileValue> entry : 
tabletMetadata.getFilesMap()
-              .entrySet()) {
-            final StoredTabletFile existing = entry.getKey();
-            final DataFileValue value = entry.getValue();
-
-            final Mutation m = new Mutation(keyExtent.toMetaRow());
-
-            // Go through each range that was created and modify the metadata 
for the file
-            // The end row should be inclusive for the current tablet and the 
previous end row
-            // should be exclusive for the start row.
-            final Set<StoredTabletFile> newFiles = new HashSet<>();
-            final Set<StoredTabletFile> existingFile = Set.of(existing);
-
-            for (Range fenced : ranges) {
-              // Clip range with the tablet range if the range already exists
-              fenced = existing.hasRange() ? existing.getRange().clip(fenced, 
true) : fenced;
-
-              // If null the range is disjoint which can happen if there are 
existing fenced files
-              // If the existing file is disjoint then later we will delete if 
the file is not part
-              // of the newFiles set which means it is disjoint with all ranges
-              if (fenced != null) {
-                final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
-                Manager.log.trace("Adding new file {} with range {}", 
newFile.getMetadataPath(),
-                    newFile.getRange());
-
-                // Add the new file to the newFiles set, it will be added 
later if it doesn't match
-                // the existing file already. We still need to add to the set 
to be checked later
-                // even if it matches the existing file as later the deletion 
logic will check to
-                // see if the existing file is part of this set before 
deleting. This is done to
-                // make sure the existing file isn't deleted unless it is not 
needed/disjoint
-                // with all ranges.
-                newFiles.add(newFile);
-              } else {
-                Manager.log.trace("Found a disjoint file {} with  range {} on 
delete",
-                    existing.getMetadataPath(), existing.getRange());
-              }
-            }
-
-            // If the existingFile is not contained in the newFiles set then 
we can delete it
-            Sets.difference(existingFile, newFiles).forEach(
-                delete -> m.putDelete(DataFileColumnFamily.NAME, 
existing.getMetadataText()));
-
-            // Add any new files that don't match the existingFile
-            // As of now we will only have at most 2 files as up to 2 ranges 
are created
-            final List<StoredTabletFile> filesToAdd =
-                new ArrayList<>(Sets.difference(newFiles, existingFile));
-            Preconditions.checkArgument(filesToAdd.size() <= 2,
-                "There should only be at most 2 StoredTabletFiles after 
computing new ranges.");
-
-            // If more than 1 new file then re-calculate the num entries and 
size
-            if (filesToAdd.size() == 2) {
-              // This splits up the values in half and makes sure they total 
the original
-              // values
-              final Pair<DataFileValue,DataFileValue> newDfvs = 
computeNewDfv(value);
-              m.put(DataFileColumnFamily.NAME, 
filesToAdd.get(0).getMetadataText(),
-                  newDfvs.getFirst().encodeAsValue());
-              m.put(DataFileColumnFamily.NAME, 
filesToAdd.get(1).getMetadataText(),
-                  newDfvs.getSecond().encodeAsValue());
-            } else {
-              // Will be 0 or 1 files
-              filesToAdd.forEach(newFile -> m.put(DataFileColumnFamily.NAME,
-                  newFile.getMetadataText(), value.encodeAsValue()));
-            }
-
-            if (!m.getUpdates().isEmpty()) {
-              bw.addMutation(m);
-            }
-          }
-        } else {
-          Manager.log.debug(
-              "Skipping metadata update on file for keyExtent {} for delete as 
not overlapping on rows.",
-              keyExtent);
-        }
-      }
-
-      bw.flush();
-
-      return startAndEndTablets;
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-
-  // Divide each new DFV in half and make sure the sum equals the original
-  @VisibleForTesting
-  protected static Pair<DataFileValue,DataFileValue> 
computeNewDfv(DataFileValue value) {
-    final DataFileValue file1Value = new DataFileValue(Math.max(1, 
value.getSize() / 2),
-        Math.max(1, value.getNumEntries() / 2), value.getTime());
-
-    final DataFileValue file2Value =
-        new DataFileValue(Math.max(1, value.getSize() - file1Value.getSize()),
-            Math.max(1, value.getNumEntries() - file1Value.getNumEntries()), 
value.getTime());
-
-    return new Pair<>(file1Value, file2Value);
-  }
-
-  private Optional<TabletMetadata> loadTabletMetadata(TableId tabletId, final 
Text row,
-      ColumnType... columns) {
-    try (TabletsMetadata tabletsMetadata = 
manager.getContext().getAmple().readTablets()
-        .forTable(tabletId).overlapping(row, true, 
row).fetch(columns).build()) {
-      return tabletsMetadata.stream().findFirst();
-    }
-  }
-
-  private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, 
AccumuloClient client)
-      throws TableNotFoundException, MutationsRejectedException {
-    Scanner scanner;
-    Mutation m;
-    // Delete everything in the other tablets
-    // group all deletes into tablet into one mutation, this makes tablets
-    // either disappear entirely or not all.. this is important for the case
-    // where the process terminates in the loop below...
-    scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME 
: MetadataTable.NAME,
-        Authorizations.EMPTY);
-    Manager.log.debug("Deleting range {}", scanRange);
-    scanner.setRange(scanRange);
-    RowIterator rowIter = new RowIterator(scanner);
-    while (rowIter.hasNext()) {
-      Iterator<Entry<Key,Value>> row = rowIter.next();
-      m = null;
-      while (row.hasNext()) {
-        Entry<Key,Value> entry = row.next();
-        Key key = entry.getKey();
-
-        if (m == null) {
-          m = new Mutation(key.getRow());
-        }
-
-        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-        Manager.log.debug("deleting entry {}", key);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.flush();
-  }
-
-  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
-    try {
-      AccumuloClient client = manager.getContext();
-      Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : 
MetadataTable.NAME,
-          Authorizations.EMPTY);
-      TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null);
-      scanner.setRange(new Range(start.toMetaRow(), null));
-      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
-      if (!iterator.hasNext()) {
-        throw new AccumuloException("No last tablet for a merge " + range);
-      }
-      Entry<Key,Value> entry = iterator.next();
-      KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry);
-      if (!highTablet.tableId().equals(range.tableId())) {
-        throw new AccumuloException("No last tablet for merge " + range + " " 
+ highTablet);
-      }
-      return highTablet;
-    } catch (Exception ex) {
-      throw new AccumuloException("Unexpected failure finding the last tablet 
for a merge " + range,
-          ex);
-    }
-  }
-
   private void handleDeadTablets(TabletLists tLists)
       throws WalMarkerException, DistributedStoreException {
     var deadTablets = tLists.assignedToDeadServers;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
deleted file mode 100644
index 59e4b087f1..0000000000
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager.state;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
-import org.apache.accumulo.core.manager.state.TabletManagement;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletState;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.server.cli.ServerUtilOpts;
-import org.apache.accumulo.server.manager.state.CurrentState;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
-import org.apache.accumulo.server.manager.state.TabletManagementIterator;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Scope;
-
-public class MergeStats {
-  final static private Logger log = LoggerFactory.getLogger(MergeStats.class);
-  private final MergeInfo info;
-  private int hosted = 0;
-  private int unassigned = 0;
-  private int total = 0;
-
-  public MergeStats(MergeInfo info) {
-    this.info = info;
-  }
-
-  public MergeInfo getMergeInfo() {
-    return info;
-  }
-
-  public void update(KeyExtent ke, TabletState state) {
-    if (info.getState().equals(MergeState.NONE)) {
-      return;
-    }
-    if (!info.overlaps(ke)) {
-      return;
-    }
-    this.total++;
-    if (state.equals(TabletState.HOSTED)) {
-      this.hosted++;
-    }
-    if (state.equals(TabletState.UNASSIGNED) || 
state.equals(TabletState.SUSPENDED)) {
-      this.unassigned++;
-    }
-  }
-
-  public MergeState nextMergeState(AccumuloClient accumuloClient, CurrentState 
manager)
-      throws Exception {
-    MergeState state = info.getState();
-    if (state == MergeState.NONE) {
-      return state;
-    }
-    if (total == 0) {
-      log.trace("failed to see any tablets for this range, ignoring {}", 
info.getExtent());
-      return state;
-    }
-    log.info("Computing next merge state for {} which is presently {} isDelete 
: {}",
-        info.getExtent(), state, info.isDelete());
-    if (state == MergeState.STARTED) {
-      log.info("{} are hosted, total {}", hosted, total);
-      if (!info.isDelete() && total == 1) {
-        log.info("Merge range is already contained in a single tablet {}", 
info.getExtent());
-        state = MergeState.COMPLETE;
-      } else if (hosted == total) {
-        state = MergeState.WAITING_FOR_OFFLINE;
-      } else {
-        log.info("Waiting for {} hosted tablets to be {} {}", hosted, total, 
info.getExtent());
-      }
-    }
-    if (state == MergeState.WAITING_FOR_OFFLINE) {
-      if (unassigned == total) {
-        if (verifyMergeConsistency(accumuloClient, manager)) {
-          state = MergeState.MERGING;
-        } else {
-          log.info("Merge consistency check failed {}", info.getExtent());
-        }
-      } else {
-        log.info("Waiting for {} unassigned tablets to be {} {}", unassigned, 
total,
-            info.getExtent());
-      }
-    }
-    if (state == MergeState.MERGING) {
-      if (hosted != 0) {
-        // Shouldn't happen
-        log.error("Unexpected state: hosted tablets should be zero {} merge 
{}", hosted,
-            info.getExtent());
-        state = MergeState.WAITING_FOR_OFFLINE;
-      }
-      if (unassigned != total) {
-        // Shouldn't happen
-        log.error("Unexpected state: unassigned tablets should be {} was {} 
merge {}", total,
-            unassigned, info.getExtent());
-        state = MergeState.WAITING_FOR_OFFLINE;
-      }
-      log.info("{} tablets are unassigned {}", unassigned, info.getExtent());
-    }
-    return state;
-  }
-
-  private boolean verifyMergeConsistency(AccumuloClient accumuloClient, 
CurrentState manager)
-      throws TableNotFoundException, IOException {
-    MergeStats verify = new MergeStats(info);
-    KeyExtent extent = info.getExtent();
-    Scanner scanner = accumuloClient
-        .createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, 
Authorizations.EMPTY);
-    TabletManagementIterator.configureScanner(scanner, manager);
-    Text start = extent.prevEndRow();
-    if (start == null) {
-      start = new Text();
-    }
-    TableId tableId = extent.tableId();
-    Text first = TabletsSection.encodeRow(tableId, start);
-    Range range = new Range(first, false, null, true);
-    scanner.setRange(range.clip(TabletsSection.getRange()));
-    KeyExtent prevExtent = null;
-
-    log.debug("Scanning range {}", range);
-    for (Entry<Key,Value> entry : scanner) {
-      final TabletManagement mti = TabletManagementIterator.decode(entry);
-      final TabletMetadata tm = mti.getTabletMetadata();
-
-      log.debug("consistency check: {} walogs {}", tm, tm.getLogs().size());
-      if (!tm.getTableId().equals(tableId)) {
-        break;
-      }
-
-      if (prevExtent == null) {
-        // this is the first tablet observed, it must be offline and its prev 
row must be less than
-        // the start of the merge range
-        if (tm.getExtent().prevEndRow() != null
-            && tm.getExtent().prevEndRow().compareTo(start) > 0) {
-          log.debug("failing consistency: prev row is too high {}", start);
-          return false;
-        }
-
-        Set<TServerInstance> liveTServers1 = manager.onlineTabletServers();
-        if (TabletState.compute(tm, liveTServers1) != TabletState.UNASSIGNED) {
-          Set<TServerInstance> liveTServers = manager.onlineTabletServers();
-          if (TabletState.compute(tm, liveTServers) != TabletState.SUSPENDED) {
-            log.debug("failing consistency: assigned or hosted {}", tm);
-            return false;
-          }
-        }
-
-      } else if (!tm.getExtent().isPreviousExtent(prevExtent)) {
-        log.debug("hole in {}", MetadataTable.NAME);
-        return false;
-      }
-
-      prevExtent = tm.getExtent();
-
-      Set<TServerInstance> liveTServers = manager.onlineTabletServers();
-      verify.update(tm.getExtent(), TabletState.compute(tm, liveTServers));
-      // stop when we've seen the tablet just beyond our range
-      if (tm.getExtent().prevEndRow() != null && extent.endRow() != null
-          && tm.getExtent().prevEndRow().compareTo(extent.endRow()) > 0) {
-        break;
-      }
-    }
-    log.debug("unassigned {} v.unassigned {} verify.total {}", unassigned, 
verify.unassigned,
-        verify.total);
-
-    return unassigned == verify.unassigned && unassigned == verify.total;
-  }
-
-  public static void main(String[] args) throws Exception {
-    ServerUtilOpts opts = new ServerUtilOpts();
-    opts.parseArgs(MergeStats.class.getName(), args);
-
-    Span span = TraceUtil.startSpan(MergeStats.class, "main");
-    try (Scope scope = span.makeCurrent()) {
-      try (AccumuloClient client = 
Accumulo.newClient().from(opts.getClientProps()).build()) {
-        Map<String,String> tableIdMap = client.tableOperations().tableIdMap();
-        ZooReaderWriter zooReaderWriter = 
opts.getServerContext().getZooReaderWriter();
-        for (Entry<String,String> entry : tableIdMap.entrySet()) {
-          final String table = entry.getKey(), tableId = entry.getValue();
-          String path = 
ZooUtil.getRoot(client.instanceOperations().getInstanceId())
-              + Constants.ZTABLES + "/" + tableId + "/merge";
-          MergeInfo info = new MergeInfo();
-          if (zooReaderWriter.exists(path)) {
-            byte[] data = zooReaderWriter.getData(path);
-            DataInputBuffer in = new DataInputBuffer();
-            in.reset(data, data.length);
-            info.readFields(in);
-          }
-          System.out.printf("%25s  %10s %10s %s%n", table, info.getState(), 
info.getOperation(),
-              info.getExtent());
-        }
-      }
-    } finally {
-      span.end();
-    }
-  }
-}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
new file mode 100644
index 0000000000..c6614cae5f
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.gc.ReferenceFile;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.gc.AllVolumesDirectory;
+import org.apache.accumulo.server.manager.state.MergeInfo;
+import org.apache.accumulo.server.manager.state.MergeState;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class DeleteRows extends ManagerRepo {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger log = LoggerFactory.getLogger(DeleteRows.class);
+
+  private final NamespaceId namespaceId;
+  private final TableId tableId;
+
+  public DeleteRows(NamespaceId namespaceId, TableId tableId) {
+    this.namespaceId = namespaceId;
+    this.tableId = tableId;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    MergeInfo mergeInfo = manager.getMergeInfo(tableId);
+    Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING);
+    Preconditions.checkState(mergeInfo.isDelete());
+
+    deleteTablets(manager, mergeInfo);
+
+    manager.setMergeState(mergeInfo, MergeState.COMPLETE);
+
+    // TODO namespace id
+    return new FinishTableRangeOp(namespaceId, tableId);
+  }
+
+  private void deleteTablets(Manager manager, MergeInfo info) throws 
AccumuloException {
+    // Before updated metadata and get the first and last tablets which
+    // are fenced if necessary
+    final Pair<KeyExtent,KeyExtent> firstAndLastTablets =
+        updateMetadataRecordsForDelete(manager, info);
+
+    // Find the deletion start row (exclusive) for tablets that need to be 
actually deleted
+    // This will be null if deleting everything up until the end row or it 
will be
+    // the endRow of the first tablet as the first tablet should be kept and 
will have
+    // already been fenced if necessary
+    final Text deletionStartRow = 
getDeletionStartRow(firstAndLastTablets.getFirst());
+
+    // Find the deletion end row (inclusive) for tablets that need to be 
actually deleted
+    // This will be null if deleting everything after the starting row or it 
will be
+    // the prevEndRow of the last tablet as the last tablet should be kept and 
will have
+    // already been fenced if necessary
+    Text deletionEndRow = getDeletionEndRow(firstAndLastTablets.getSecond());
+
+    // check if there are any tablets to delete and if not return
+    if (!hasTabletsToDelete(firstAndLastTablets.getFirst(), 
firstAndLastTablets.getSecond())) {
+      log.trace("No tablets to delete for range {}, returning", 
info.getExtent());
+      return;
+    }
+
+    // Build an extent for the actual deletion range
+    final KeyExtent extent =
+        new KeyExtent(info.getExtent().tableId(), deletionEndRow, 
deletionStartRow);
+    log.debug("Tablet deletion range is {}", extent);
+    String targetSystemTable = extent.isMeta() ? RootTable.NAME : 
MetadataTable.NAME;
+    log.debug("Deleting tablets for {}", extent);
+    MetadataTime metadataTime = null;
+    KeyExtent followingTablet = null;
+    Set<TabletHostingGoal> goals = new HashSet<>();
+    if (extent.endRow() != null) {
+      Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW);
+      followingTablet = getHighTablet(manager,
+          new KeyExtent(extent.tableId(), nextExtent.getRow(), 
extent.endRow()));
+      log.debug("Found following tablet {}", followingTablet);
+    }
+    try {
+      AccumuloClient client = manager.getContext();
+      ServerContext context = manager.getContext();
+      Ample ample = context.getAmple();
+      Text start = extent.prevEndRow();
+      if (start == null) {
+        start = new Text();
+      }
+      log.debug("Making file deletion entries for {}", extent);
+      Range deleteRange =
+          new Range(MetadataSchema.TabletsSection.encodeRow(extent.tableId(), 
start), false,
+              MetadataSchema.TabletsSection.encodeRow(extent.tableId(), 
extent.endRow()), true);
+      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
+      scanner.setRange(deleteRange);
+      
MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
+      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+      Set<ReferenceFile> datafilesAndDirs = new TreeSet<>();
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        if 
(key.compareColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)
 == 0) {
+          var stf = new 
StoredTabletFile(key.getColumnQualifierData().toString());
+          datafilesAndDirs.add(new ReferenceFile(stf.getTableId(), stf));
+          if (datafilesAndDirs.size() > 1000) {
+            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
+            datafilesAndDirs.clear();
+          }
+        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          metadataTime = MetadataTime.parse(entry.getValue().toString());
+        } else if (key.compareColumnFamily(
+            MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) == 
0) {
+          throw new IllegalStateException(
+              "Tablet " + key.getRow() + " is assigned during a merge!");
+        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
+            .hasColumns(key)) {
+          var allVolumesDirectory =
+              new AllVolumesDirectory(extent.tableId(), 
entry.getValue().toString());
+          datafilesAndDirs.add(allVolumesDirectory);
+          if (datafilesAndDirs.size() > 1000) {
+            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
+            datafilesAndDirs.clear();
+          }
+        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) 
{
+          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
+          goals.add(thisGoal);
+        }
+      }
+      ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs);
+      BatchWriter bw = client.createBatchWriter(targetSystemTable);
+      try {
+        deleteTablets(info, deleteRange, bw, client);
+      } finally {
+        bw.close();
+      }
+
+      if (followingTablet != null) {
+        log.debug("Updating prevRow of {} to {}", followingTablet, 
extent.prevEndRow());
+        bw = client.createBatchWriter(targetSystemTable);
+        try {
+          Mutation m = new Mutation(followingTablet.toMetaRow());
+          
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m,
+              MetadataSchema.TabletsSection.TabletColumnFamily
+                  .encodePrevEndRow(extent.prevEndRow()));
+          bw.addMutation(m);
+          bw.flush();
+        } finally {
+          bw.close();
+        }
+      } else {
+        // Recreate the default tablet to hold the end of the table
+        MetadataTableUtil.addTablet(new KeyExtent(extent.tableId(), null, 
extent.prevEndRow()),
+            
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME,
+            manager.getContext(), metadataTime.getType(), 
manager.getManagerLock(),
+            getMergeHostingGoal(extent, goals));
+      }
+    } catch (RuntimeException | TableNotFoundException ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+
+  private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(Manager 
manager, MergeInfo info)
+      throws AccumuloException {
+    final KeyExtent range = info.getExtent();
+
+    String targetSystemTable = MetadataTable.NAME;
+    if (range.isMeta()) {
+      targetSystemTable = RootTable.NAME;
+    }
+    final Pair<KeyExtent,KeyExtent> startAndEndTablets;
+
+    final AccumuloClient client = manager.getContext();
+
+    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
+      final Text startRow = range.prevEndRow();
+      final Text endRow = range.endRow() != null
+          ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : 
null;
+
+      // Find the tablets that overlap the start and end row of the deletion 
range
+      // If the startRow is null then there will be an empty startTablet we 
don't need
+      // to fence a starting tablet as we are deleting everything up to the 
end tablet
+      // Likewise, if the endRow is null there will be an empty endTablet as 
we are deleting
+      // all tablets after the starting tablet
+      final Optional<TabletMetadata> startTablet =
+          Optional.ofNullable(startRow).flatMap(row -> 
loadTabletMetadata(manager, range.tableId(),
+              row, TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.FILES));
+      final Optional<TabletMetadata> endTablet =
+          Optional.ofNullable(endRow).flatMap(row -> 
loadTabletMetadata(manager, range.tableId(),
+              row, TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.FILES));
+
+      // Store the tablets in a Map if present so that if we have the same 
Tablet we
+      // only need to process the same tablet once when fencing
+      final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new 
TreeMap<>();
+      startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft));
+      endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), 
lt));
+
+      // Capture the tablets to return them or null if not loaded
+      startAndEndTablets = new 
Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null),
+          endTablet.map(TabletMetadata::getExtent).orElse(null));
+
+      for (TabletMetadata tabletMetadata : tabletMetadatas.values()) {
+        final KeyExtent keyExtent = tabletMetadata.getExtent();
+
+        // Check if this tablet needs to have its files fenced for the deletion
+        if (needsFencingForDeletion(info, keyExtent)) {
+          log.debug("Found overlapping keyExtent {} for delete, fencing 
files.", keyExtent);
+
+          // Create the ranges for fencing the files, this takes the place of
+          // chop compactions and splits
+          final List<Range> ranges = createRangesForDeletion(tabletMetadata, 
range);
+          Preconditions.checkState(!ranges.isEmpty(),
+              "No ranges found that overlap deletion range.");
+
+          // Go through and fence each of the files that are part of the tablet
+          for (Map.Entry<StoredTabletFile,DataFileValue> entry : 
tabletMetadata.getFilesMap()
+              .entrySet()) {
+            final StoredTabletFile existing = entry.getKey();
+            final DataFileValue value = entry.getValue();
+
+            final Mutation m = new Mutation(keyExtent.toMetaRow());
+
+            // Go through each range that was created and modify the metadata 
for the file
+            // The end row should be inclusive for the current tablet and the 
previous end row
+            // should be exclusive for the start row.
+            final Set<StoredTabletFile> newFiles = new HashSet<>();
+            final Set<StoredTabletFile> existingFile = Set.of(existing);
+
+            for (Range fenced : ranges) {
+              // Clip range with the tablet range if the range already exists
+              fenced = existing.hasRange() ? existing.getRange().clip(fenced, 
true) : fenced;
+
+              // If null the range is disjoint which can happen if there are 
existing fenced files
+              // If the existing file is disjoint then later we will delete if 
the file is not part
+              // of the newFiles set which means it is disjoint with all ranges
+              if (fenced != null) {
+                final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
+                log.trace("Adding new file {} with range {}", 
newFile.getMetadataPath(),
+                    newFile.getRange());
+
+                // Add the new file to the newFiles set, it will be added 
later if it doesn't match
+                // the existing file already. We still need to add to the set 
to be checked later
+                // even if it matches the existing file as later the deletion 
logic will check to
+                // see if the existing file is part of this set before 
deleting. This is done to
+                // make sure the existing file isn't deleted unless it is not 
needed/disjoint
+                // with all ranges.
+                newFiles.add(newFile);
+              } else {
+                log.trace("Found a disjoint file {} with  range {} on delete",
+                    existing.getMetadataPath(), existing.getRange());
+              }
+            }
+
+            // If the existingFile is not contained in the newFiles set then 
we can delete it
+            Sets.difference(existingFile, newFiles).forEach(
+                delete -> 
m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
+                    existing.getMetadataText()));
+
+            // Add any new files that don't match the existingFile
+            // As of now we will only have at most 2 files as up to 2 ranges 
are created
+            final List<StoredTabletFile> filesToAdd =
+                new ArrayList<>(Sets.difference(newFiles, existingFile));
+            Preconditions.checkArgument(filesToAdd.size() <= 2,
+                "There should only be at most 2 StoredTabletFiles after 
computing new ranges.");
+
+            // If more than 1 new file then re-calculate the num entries and 
size
+            if (filesToAdd.size() == 2) {
+              // This splits up the values in half and makes sure they total 
the original
+              // values
+              final Pair<DataFileValue,DataFileValue> newDfvs = 
computeNewDfv(value);
+              m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
+                  filesToAdd.get(0).getMetadataText(), 
newDfvs.getFirst().encodeAsValue());
+              m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
+                  filesToAdd.get(1).getMetadataText(), 
newDfvs.getSecond().encodeAsValue());
+            } else {
+              // Will be 0 or 1 files
+              filesToAdd
+                  .forEach(newFile -> 
m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
+                      newFile.getMetadataText(), value.encodeAsValue()));
+            }
+
+            if (!m.getUpdates().isEmpty()) {
+              bw.addMutation(m);
+            }
+          }
+        } else {
+          log.debug(
+              "Skipping metadata update on file for keyExtent {} for delete as 
not overlapping on rows.",
+              keyExtent);
+        }
+      }
+
+      bw.flush();
+
+      return startAndEndTablets;
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+
+  // This method finds returns the deletion starting row (exclusive) for 
tablets that
+  // need to be actually deleted. If the startTablet is null then
+  // the deletion start row will just be null as all tablets are being deleted
+  // up to the end. Otherwise, this returns the endRow of the first tablet
+  // as the first tablet should be kept and will have been previously
+  // fenced if necessary
+  private Text getDeletionStartRow(final KeyExtent startTablet) {
+    if (startTablet == null) {
+      log.debug("First tablet for delete range is null");
+      return null;
+    }
+
+    final Text deletionStartRow = startTablet.endRow();
+    log.debug("Start row is {} for deletion", deletionStartRow);
+
+    return deletionStartRow;
+  }
+
+  // This method finds returns the deletion ending row (inclusive) for tablets 
that
+  // need to be actually deleted. If the endTablet is null then
+  // the deletion end row will just be null as all tablets are being deleted
+  // after the start row. Otherwise, this returns the prevEndRow of the last 
tablet
+  // as the last tablet should be kept and will have been previously
+  // fenced if necessary
+  private Text getDeletionEndRow(final KeyExtent endTablet) {
+    if (endTablet == null) {
+      log.debug("Last tablet for delete range is null");
+      return null;
+    }
+
+    Text deletionEndRow = endTablet.prevEndRow();
+    log.debug("Deletion end row is {}", deletionEndRow);
+
+    return deletionEndRow;
+  }
+
+  static TabletHostingGoal getMergeHostingGoal(KeyExtent range, 
Set<TabletHostingGoal> goals) {
+    TabletHostingGoal mergeHostingGoal = TabletHostingGoal.ONDEMAND;
+    if (range.isMeta() || goals.contains(TabletHostingGoal.ALWAYS)) {
+      mergeHostingGoal = TabletHostingGoal.ALWAYS;
+    } else if (goals.contains(TabletHostingGoal.NEVER)) {
+      mergeHostingGoal = TabletHostingGoal.NEVER;
+    }
+    return mergeHostingGoal;
+  }
+
+  // Divide each new DFV in half and make sure the sum equals the original
+  @VisibleForTesting
+  static Pair<DataFileValue,DataFileValue> computeNewDfv(DataFileValue value) {
+    final DataFileValue file1Value = new DataFileValue(Math.max(1, 
value.getSize() / 2),
+        Math.max(1, value.getNumEntries() / 2), value.getTime());
+
+    final DataFileValue file2Value =
+        new DataFileValue(Math.max(1, value.getSize() - file1Value.getSize()),
+            Math.max(1, value.getNumEntries() - file1Value.getNumEntries()), 
value.getTime());
+
+    return new Pair<>(file1Value, file2Value);
+  }
+
+  private Optional<TabletMetadata> loadTabletMetadata(Manager manager, TableId 
tabletId,
+      final Text row, TabletMetadata.ColumnType... columns) {
+    try (TabletsMetadata tabletsMetadata = 
manager.getContext().getAmple().readTablets()
+        .forTable(tabletId).overlapping(row, true, 
row).fetch(columns).build()) {
+      return tabletsMetadata.stream().findFirst();
+    }
+  }
+
+  // This method is used to detect if a tablet needs to be split/chopped for a 
delete
+  // Instead of performing a split or chop compaction, the tablet will have 
its files fenced.
+  private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) 
{
+    // Does this extent cover the end points of the delete?
+    final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r);
+    final Predicate<Text> isNotBoundary =
+        r -> !r.equals(keyExtent.endRow()) && 
!r.equals(keyExtent.prevEndRow());
+    final KeyExtent deleteRange = info.getExtent();
+
+    return (keyExtent.overlaps(deleteRange) && Stream
+        .of(deleteRange.prevEndRow(), 
deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary)))
+        || info.needsToBeChopped(keyExtent);
+  }
+
+  // Instead of splitting or chopping tablets for a delete we instead create 
ranges
+  // to exclude the portion of the tablet that should be deleted
+  private Text followingRow(Text row) {
+    if (row == null) {
+      return null;
+    }
+    return new Key(row).followingKey(PartialKey.ROW).getRow();
+  }
+
+  // Instead of splitting or chopping tablets for a delete we instead create 
ranges
+  // to exclude the portion of the tablet that should be deleted
+  private List<Range> createRangesForDeletion(TabletMetadata tabletMetadata,
+      final KeyExtent deleteRange) {
+    final KeyExtent tabletExtent = tabletMetadata.getExtent();
+
+    // If the delete range wholly contains the tablet being deleted then there 
is no range to clip
+    // files to because the files should be completely dropped.
+    Preconditions.checkArgument(!deleteRange.contains(tabletExtent), "delete 
range:%s tablet:%s",
+        deleteRange, tabletExtent);
+
+    final List<Range> ranges = new ArrayList<>();
+
+    if (deleteRange.overlaps(tabletExtent)) {
+      if (deleteRange.prevEndRow() != null
+          && tabletExtent.contains(followingRow(deleteRange.prevEndRow()))) {
+        log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, 
tabletExtent.prevEndRow(),
+            deleteRange.prevEndRow());
+        ranges.add(new Range(tabletExtent.prevEndRow(), false, 
deleteRange.prevEndRow(), true));
+      }
+
+      // This covers the case of when a deletion range overlaps the last 
tablet. We need to create a
+      // range that excludes the deletion.
+      if (deleteRange.endRow() != null
+          && tabletMetadata.getExtent().contains(deleteRange.endRow())) {
+        log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, 
deleteRange.endRow(),
+            tabletExtent.endRow());
+        ranges.add(new Range(deleteRange.endRow(), false, 
tabletExtent.endRow(), true));
+      }
+    } else {
+      log.trace("Fencing tablet {} files to itself because it does not overlap 
delete range",
+          tabletExtent);
+      ranges.add(tabletExtent.toDataRange());
+    }
+
+    return ranges;
+  }
+
+  private static boolean isFirstTabletInTable(KeyExtent tablet) {
+    return tablet != null && tablet.prevEndRow() == null;
+  }
+
+  private static boolean isLastTabletInTable(KeyExtent tablet) {
+    return tablet != null && tablet.endRow() == null;
+  }
+
+  private static boolean areContiguousTablets(KeyExtent firstTablet, KeyExtent 
lastTablet) {
+    return firstTablet != null && lastTablet != null
+        && Objects.equals(firstTablet.endRow(), lastTablet.prevEndRow());
+  }
+
+  private boolean hasTabletsToDelete(final KeyExtent firstTabletInRange,
+      final KeyExtent lastTableInRange) {
+    // If the tablets are equal (and not null) then the deletion range is just 
part of 1 tablet
+    // which will be fenced so there are no tablets to delete. The null check 
is because if both
+    // are null then we are just deleting everything, so we do have tablets to 
delete
+    if (Objects.equals(firstTabletInRange, lastTableInRange) && 
firstTabletInRange != null) {
+      log.trace(
+          "No tablets to delete, firstTablet {} equals lastTablet {} in 
deletion range and was fenced.",
+          firstTabletInRange, lastTableInRange);
+      return false;
+      // If the lastTablet of the deletion range is the first tablet of the 
table it has been fenced
+      // already so nothing to actually delete before it
+    } else if (isFirstTabletInTable(lastTableInRange)) {
+      log.trace(
+          "No tablets to delete, lastTablet {} in deletion range is the first 
tablet of the table and was fenced.",
+          lastTableInRange);
+      return false;
+      // If the firstTablet of the deletion range is the last tablet of the 
table it has been fenced
+      // already so nothing to actually delete after it
+    } else if (isLastTabletInTable(firstTabletInRange)) {
+      log.trace(
+          "No tablets to delete, firstTablet {} in deletion range is the last 
tablet of the table and was fenced.",
+          firstTabletInRange);
+      return false;
+      // If the firstTablet and lastTablet are contiguous tablets then there 
is nothing to delete as
+      // each will be fenced and nothing between
+    } else if (areContiguousTablets(firstTabletInRange, lastTableInRange)) {
+      log.trace(
+          "No tablets to delete, firstTablet {} and lastTablet {} in deletion 
range are contiguous and were fenced.",
+          firstTabletInRange, lastTableInRange);
+      return false;
+    }
+
+    return true;
+  }
+
+  static void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, 
AccumuloClient client)
+      throws TableNotFoundException, MutationsRejectedException {
+    Scanner scanner;
+    Mutation m;
+    // Delete everything in the other tablets
+    // group all deletes into tablet into one mutation, this makes tablets
+    // either disappear entirely or not all.. this is important for the case
+    // where the process terminates in the loop below...
+    scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME 
: MetadataTable.NAME,
+        Authorizations.EMPTY);
+    log.debug("Deleting range {}", scanRange);
+    scanner.setRange(scanRange);
+    RowIterator rowIter = new RowIterator(scanner);
+    while (rowIter.hasNext()) {
+      Iterator<Map.Entry<Key,Value>> row = rowIter.next();
+      m = null;
+      while (row.hasNext()) {
+        Map.Entry<Key,Value> entry = row.next();
+        Key key = entry.getKey();
+
+        if (m == null) {
+          m = new Mutation(key.getRow());
+        }
+
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        log.debug("deleting entry {}", key);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.flush();
+  }
+
+  static KeyExtent getHighTablet(Manager manager, KeyExtent range) throws 
AccumuloException {
+    try {
+      AccumuloClient client = manager.getContext();
+      Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : 
MetadataTable.NAME,
+          Authorizations.EMPTY);
+      
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null);
+      scanner.setRange(new Range(start.toMetaRow(), null));
+      Iterator<Map.Entry<Key,Value>> iterator = scanner.iterator();
+      if (!iterator.hasNext()) {
+        throw new AccumuloException("No last tablet for a merge " + range);
+      }
+      Map.Entry<Key,Value> entry = iterator.next();
+      KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry);
+      if (!highTablet.tableId().equals(range.tableId())) {
+        throw new AccumuloException("No last tablet for merge " + range + " " 
+ highTablet);
+      }
+      return highTablet;
+    } catch (Exception ex) {
+      throw new AccumuloException("Unexpected failure finding the last tablet 
for a merge " + range,
+          ex);
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
similarity index 86%
rename from 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java
rename to 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
index 9b73d4c538..574335dcf3 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
@@ -25,11 +25,12 @@ import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
 import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * ELASTICITY_TODO edit these docs which are pre elasticity changes. Best done 
after #3763
+ *
  * Merge makes things hard.
  *
  * Typically, a client will read the list of tablets, and begin an operation 
on that tablet at the
@@ -46,26 +47,18 @@ import org.slf4j.LoggerFactory;
  * Normal operations, like bulk imports, will grab the read lock and prevent 
merges (writes) while
  * they run. Merge operations will lock out some operations while they run.
  */
-class TableRangeOpWait extends ManagerRepo {
-  private static final Logger log = 
LoggerFactory.getLogger(TableRangeOpWait.class);
+class FinishTableRangeOp extends ManagerRepo {
+  private static final Logger log = 
LoggerFactory.getLogger(FinishTableRangeOp.class);
 
   private static final long serialVersionUID = 1L;
   private TableId tableId;
   private NamespaceId namespaceId;
 
-  public TableRangeOpWait(NamespaceId namespaceId, TableId tableId) {
+  public FinishTableRangeOp(NamespaceId namespaceId, TableId tableId) {
     this.tableId = tableId;
     this.namespaceId = namespaceId;
   }
 
-  @Override
-  public long isReady(long tid, Manager env) {
-    if (!env.getMergeInfo(tableId).getState().equals(MergeState.NONE)) {
-      return 50;
-    }
-    return 0;
-  }
-
   @Override
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
     MergeInfo mergeInfo = manager.getMergeInfo(tableId);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
new file mode 100644
index 0000000000..d6a7de9716
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.gc.AllVolumesDirectory;
+import org.apache.accumulo.server.manager.state.MergeInfo;
+import org.apache.accumulo.server.manager.state.MergeState;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MergeTablets extends ManagerRepo {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger log = 
LoggerFactory.getLogger(MergeTablets.class);
+
+  private final NamespaceId namespaceId;
+  private final TableId tableId;
+
+  public MergeTablets(NamespaceId namespaceId, TableId tableId) {
+    this.namespaceId = namespaceId;
+    this.tableId = tableId;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    MergeInfo mergeInfo = manager.getMergeInfo(tableId);
+    Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING);
+    Preconditions.checkState(!mergeInfo.isDelete());
+
+    var extent = mergeInfo.getExtent();
+    long tabletCount;
+
+    try (var tabletMeta = 
manager.getContext().getAmple().readTablets().forTable(extent.tableId())
+        .overlapping(extent.prevEndRow(), 
extent.endRow()).fetch(TabletMetadata.ColumnType.PREV_ROW)
+        .checkConsistency().build()) {
+      tabletCount = tabletMeta.stream().count();
+    }
+
+    if (tabletCount > 1) {
+      mergeMetadataRecords(manager, mergeInfo);
+    }
+
+    return new FinishTableRangeOp(namespaceId, tableId);
+  }
+
+  private void mergeMetadataRecords(Manager manager, MergeInfo info) throws 
AccumuloException {
+    KeyExtent range = info.getExtent();
+    log.debug("Merging metadata for {}", range);
+    KeyExtent stop = DeleteRows.getHighTablet(manager, range);
+    log.debug("Highest tablet is {}", stop);
+    Value firstPrevRowValue = null;
+    Text stopRow = stop.toMetaRow();
+    Text start = range.prevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Range scanRange = new 
Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start),
+        false, stopRow, false);
+    String targetSystemTable = MetadataTable.NAME;
+    if (range.isMeta()) {
+      targetSystemTable = RootTable.NAME;
+    }
+    Set<TabletHostingGoal> goals = new HashSet<>();
+
+    AccumuloClient client = manager.getContext();
+
+    KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow());
+    KeyExtent previousKeyExtent = null;
+    KeyExtent lastExtent = null;
+
+    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
+      long fileCount = 0;
+      // Make file entries in highest tablet
+      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
+      // Update to set the range to include the highest tablet
+      scanner.setRange(new 
Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start),
+          false, stopRow, true));
+      
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
+      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+      Mutation m = new Mutation(stopRow);
+      MetadataTime maxLogicalTime = null;
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+
+        final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow());
+
+        // Keep track of the last Key Extent seen so we can use it to fence
+        // of RFiles when merging the metadata
+        if (lastExtent != null && !keyExtent.equals(lastExtent)) {
+          previousKeyExtent = lastExtent;
+        }
+
+        // Special case to handle the highest/stop tablet, which is where 
files are
+        // merged to. The existing merge code won't delete files from this 
tablet
+        // so we need to handle the deletes in this tablet when fencing files.
+        // We may be able to make this simpler in the future.
+        if (keyExtent.equals(stopExtent)) {
+          if (previousKeyExtent != null && key.getColumnFamily()
+              
.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+
+            // Fence off existing files by the end row of the previous tablet 
and current tablet
+            final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
+            // The end row should be inclusive for the current tablet and the 
previous end row
+            // should be exclusive for the start row
+            Range fenced = new Range(previousKeyExtent.endRow(), false, 
keyExtent.endRow(), true);
+
+            // Clip range if exists
+            fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
+
+            final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
+            // If the existing metadata does not match then we need to delete 
the old
+            // and replace with a new range
+            if (!existing.equals(newFile)) {
+              
m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
+                  existing.getMetadataText());
+              m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
+            }
+
+            fileCount++;
+          }
+          // For the highest tablet we only care about the DataFileColumnFamily
+          continue;
+        }
+
+        // Handle metadata updates for all other tablets except the highest 
tablet
+        // Ranges are created for the files and then added to the highest 
tablet in
+        // the merge range. Deletes are handled later for the old files when 
the tablets
+        // are removed.
+        if 
(key.getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME))
 {
+          final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
+
+          // Fence off files by the previous tablet and current tablet that is 
being merged
+          // The end row should be inclusive for the current tablet and the 
previous end row should
+          // be exclusive for the start row.
+          Range fenced = new Range(previousKeyExtent != null ? 
previousKeyExtent.endRow() : null,
+              false, keyExtent.endRow(), true);
+
+          // Clip range with the tablet range if the range already exists
+          fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
+
+          // Move the file and range to the last tablet
+          StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), 
fenced);
+          m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
+
+          fileCount++;
+        } else if 
(MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
+            && firstPrevRowValue == null) {
+          log.debug("prevRow entry for lowest tablet is {}", value);
+          firstPrevRowValue = new Value(value);
+        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          maxLogicalTime =
+              TabletTime.maxMetadataTime(maxLogicalTime, 
MetadataTime.parse(value.toString()));
+        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
+            .hasColumns(key)) {
+          var allVolumesDir = new AllVolumesDirectory(range.tableId(), 
value.toString());
+          
bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir));
+        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) 
{
+          TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(value);
+          goals.add(thisGoal);
+        }
+
+        lastExtent = keyExtent;
+      }
+
+      // read the logical time from the last tablet in the merge range, it is 
not included in
+      // the loop above
+      scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(new Range(stopRow));
+      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
+      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME);
+      Set<String> extCompIds = new HashSet<>();
+      for (Map.Entry<Key,Value> entry : scanner) {
+        if (MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN
+            .hasColumns(entry.getKey())) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime,
+              MetadataTime.parse(entry.getValue().toString()));
+        } else if 
(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME
+            .equals(entry.getKey().getColumnFamily())) {
+          extCompIds.add(entry.getKey().getColumnQualifierData().toString());
+        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN
+            .hasColumns(entry.getKey())) {
+          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
+          goals.add(thisGoal);
+        }
+      }
+
+      if (maxLogicalTime != null) {
+        MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m,
+            new Value(maxLogicalTime.encode()));
+      }
+
+      // delete any entries for external compactions
+      extCompIds.forEach(ecid -> m
+          
.putDelete(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.STR_NAME,
 ecid));
+
+      // Set the TabletHostingGoal for this tablet based on the goals of the 
other tablets in
+      // the merge range. Always takes priority over never.
+      TabletHostingGoal mergeHostingGoal = 
DeleteRows.getMergeHostingGoal(range, goals);
+      MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.put(m,
+          TabletHostingGoalUtil.toValue(mergeHostingGoal));
+
+      if (!m.getUpdates().isEmpty()) {
+        bw.addMutation(m);
+      }
+
+      bw.flush();
+
+      log.debug("Moved {} files to {}", fileCount, stop);
+
+      if (firstPrevRowValue == null) {
+        log.debug("tablet already merged");
+        return;
+      }
+
+      stop = new KeyExtent(stop.tableId(), stop.endRow(),
+          
MetadataSchema.TabletsSection.TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
+      Mutation updatePrevRow =
+          
MetadataSchema.TabletsSection.TabletColumnFamily.createPrevRowMutation(stop);
+      log.debug("Setting the prevRow for last tablet: {}", stop);
+      bw.addMutation(updatePrevRow);
+      bw.flush();
+
+      DeleteRows.deleteTablets(info, scanRange, bw, client);
+
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
index 2990b9e59f..fa1f587e3a 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
@@ -86,12 +86,14 @@ public class TableRangeOp extends ManagerRepo {
 
     MergeInfo info = env.getMergeInfo(tableId);
 
+    // ELASTICITY_TODO can remove MergeState and MergeInfo once opid is set, 
these only exists now
+    // to get tablets unassigned. Once an opid is set on a tablet it will be 
unassigned. See #3763
     if (info.getState() == MergeState.NONE) {
       KeyExtent range = new KeyExtent(tableId, end, start);
-      env.setMergeState(new MergeInfo(range, op), MergeState.STARTED);
+      env.setMergeState(new MergeInfo(range, op), 
MergeState.WAITING_FOR_OFFLINE);
     }
 
-    return new TableRangeOpWait(namespaceId, tableId);
+    return new WaitForOffline(namespaceId, tableId);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
new file mode 100644
index 0000000000..c55ff213c5
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.manager.state.MergeInfo;
+import org.apache.accumulo.server.manager.state.MergeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WaitForOffline extends ManagerRepo {
+
+  private static final Logger log = 
LoggerFactory.getLogger(WaitForOffline.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private final NamespaceId namespaceId;
+  private final TableId tableId;
+
+  public WaitForOffline(NamespaceId namespaceId, TableId tableId) {
+    this.namespaceId = namespaceId;
+    this.tableId = tableId;
+  }
+
+  @Override
+  public long isReady(long tid, Manager env) throws Exception {
+    MergeInfo mergeInfo = env.getMergeInfo(tableId);
+    var extent = mergeInfo.getExtent();
+
+    long tabletsWithLocations;
+
+    try (var tabletMeta = 
env.getContext().getAmple().readTablets().forTable(extent.tableId())
+        .overlapping(extent.prevEndRow(), extent.endRow())
+        .fetch(TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.LOCATION)
+        .checkConsistency().build()) {
+      tabletsWithLocations = tabletMeta.stream().filter(tm -> tm.getLocation() 
!= null).count();
+    }
+
+    log.info("{} waiting for {} tablets with locations", 
FateTxId.formatTid(tid),
+        tabletsWithLocations);
+
+    if (tabletsWithLocations > 0) {
+      return 1000;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager env) throws Exception {
+    MergeInfo mergeInfo = env.getMergeInfo(tableId);
+    env.setMergeState(mergeInfo, MergeState.MERGING);
+    if (mergeInfo.isDelete()) {
+      return new DeleteRows(namespaceId, tableId);
+    } else {
+      return new MergeTablets(namespaceId, tableId);
+    }
+  }
+}
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java
similarity index 87%
rename from 
server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
rename to 
server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java
index 35026efbe9..0dfe5ee6a5 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.manager;
+package org.apache.accumulo.manager.tableOps.merge;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -24,12 +24,12 @@ import 
org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Pair;
 import org.junit.jupiter.api.Test;
 
-public class TabletGroupWatcherTest {
+public class DeleteRowsTest {
 
   @Test
   public void testComputeNewDfvEven() {
     DataFileValue original = new DataFileValue(20, 10, 100);
-    Pair<DataFileValue,DataFileValue> newValues = 
TabletGroupWatcher.computeNewDfv(original);
+    Pair<DataFileValue,DataFileValue> newValues = 
DeleteRows.computeNewDfv(original);
 
     assertEquals(10, newValues.getFirst().getSize());
     assertEquals(5, newValues.getFirst().getNumEntries());
@@ -42,7 +42,7 @@ public class TabletGroupWatcherTest {
   @Test
   public void testComputeNewDfvOdd() {
     DataFileValue original = new DataFileValue(21, 11, 100);
-    Pair<DataFileValue,DataFileValue> newValues = 
TabletGroupWatcher.computeNewDfv(original);
+    Pair<DataFileValue,DataFileValue> newValues = 
DeleteRows.computeNewDfv(original);
 
     assertEquals(10, newValues.getFirst().getSize());
     assertEquals(5, newValues.getFirst().getNumEntries());
@@ -55,7 +55,7 @@ public class TabletGroupWatcherTest {
   @Test
   public void testComputeNewDfvSmall() {
     DataFileValue original = new DataFileValue(1, 2, 100);
-    Pair<DataFileValue,DataFileValue> newValues = 
TabletGroupWatcher.computeNewDfv(original);
+    Pair<DataFileValue,DataFileValue> newValues = 
DeleteRows.computeNewDfv(original);
 
     assertEquals(1, newValues.getFirst().getSize());
     assertEquals(1, newValues.getFirst().getNumEntries());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java 
b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
deleted file mode 100644
index 1806f4c638..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.manager;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchDeleter;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.manager.state.TabletManagement;
-import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
-import org.apache.accumulo.core.manager.thrift.ManagerState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletState;
-import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.manager.state.MergeStats;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.manager.state.Assignment;
-import org.apache.accumulo.server.manager.state.CurrentState;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
-import org.apache.accumulo.server.manager.state.TabletStateStore;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.net.HostAndPort;
-
-public class MergeStateIT extends ConfigurableMacBase {
-
-  private static class MockCurrentState implements CurrentState {
-
-    TServerInstance someTServer =
-        new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 
0x123456);
-    MergeInfo mergeInfo;
-
-    MockCurrentState(MergeInfo info) {
-      this.mergeInfo = info;
-    }
-
-    @Override
-    public Set<TableId> onlineTables() {
-      return Collections.singleton(TableId.of("t"));
-    }
-
-    @Override
-    public Set<TServerInstance> onlineTabletServers() {
-      return Collections.singleton(someTServer);
-    }
-
-    @Override
-    public Map<String,Set<TServerInstance>> tServerResourceGroups() {
-      return new HashMap<>();
-    }
-
-    @Override
-    public Collection<MergeInfo> merges() {
-      return Collections.singleton(mergeInfo);
-    }
-
-    @Override
-    public Set<KeyExtent> migrationsSnapshot() {
-      return Collections.emptySet();
-    }
-
-    @Override
-    public ManagerState getManagerState() {
-      return ManagerState.NORMAL;
-    }
-
-    @Override
-    public Map<Long,Map<String,String>> getCompactionHints() {
-      return Map.of();
-    }
-
-    @Override
-    public Set<TServerInstance> shutdownServers() {
-      return Collections.emptySet();
-    }
-
-  }
-
-  private static void update(AccumuloClient c, Mutation m)
-      throws TableNotFoundException, MutationsRejectedException {
-    try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) {
-      bw.addMutation(m);
-    }
-  }
-
-  @Test
-  public void test() throws Exception {
-    ServerContext context = getServerContext();
-    try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      
accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(),
-          MetadataTable.NAME, TablePermission.WRITE);
-      BatchWriter bw = accumuloClient.createBatchWriter(MetadataTable.NAME);
-
-      TreeSet<Text> splits = new TreeSet<>();
-      splits.add(new Text("a"));
-      splits.add(new Text("e"));
-      splits.add(new Text("j"));
-      splits.add(new Text("o"));
-      splits.add(new Text("t"));
-      splits.add(new Text("z"));
-      NewTableConfiguration ntc = new NewTableConfiguration();
-      ntc.withSplits(splits);
-      accumuloClient.tableOperations().create("merge_test_table");
-      TableId tableId =
-          
TableId.of(accumuloClient.tableOperations().tableIdMap().get("merge_test_table"));
-
-      Text pr = null;
-      for (Text split : splits) {
-        Mutation prevRow =
-            TabletColumnFamily.createPrevRowMutation(new KeyExtent(tableId, 
split, pr));
-        prevRow.put(CurrentLocationColumnFamily.NAME, new Text("123456"),
-            new Value("127.0.0.1:1234"));
-        bw.addMutation(prevRow);
-        pr = split;
-      }
-      bw.close();
-
-      // Read out the TabletLocationStates
-      MockCurrentState state =
-          new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new 
Text("p"), new Text("e")),
-              MergeInfo.Operation.MERGE));
-
-      // Verify the tablet state: hosted, and count
-      TabletStateStore metaDataStateStore =
-          TabletStateStore.getStoreForLevel(DataLevel.USER, context, state);
-      int count = 0;
-      for (TabletManagement mti : metaDataStateStore) {
-        if (mti != null) {
-          assertEquals(1, mti.actions.size());
-          assertEquals(ManagementAction.NEEDS_LOCATION_UPDATE, 
mti.getActions().iterator().next());
-          count++;
-        }
-      }
-      assertEquals(6, count);
-
-      // Create the hole
-      // Split the tablet at one end of the range
-      Mutation m = TabletColumnFamily
-          .createPrevRowMutation(new KeyExtent(tableId, new Text("t"), new 
Text("p")));
-      TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5"));
-      TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m,
-          TabletColumnFamily.encodePrevEndRow(new Text("o")));
-      update(accumuloClient, m);
-
-      // ELASTICITY_TODO: Tried to fix this up, not sure how this works
-
-      // do the state check
-      MergeStats stats = scan(state, metaDataStateStore);
-      // MergeState newState = stats.nextMergeState(accumuloClient, state);
-      // assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
-
-      // unassign the tablets
-      try (BatchDeleter deleter =
-          accumuloClient.createBatchDeleter(MetadataTable.NAME, 
Authorizations.EMPTY, 1000)) {
-        deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
-        deleter.setRanges(Collections.singletonList(new Range()));
-        deleter.delete();
-      }
-
-      // now we should be ready to merge but, we have inconsistent metadata
-      stats = scan(state, metaDataStateStore);
-      assertEquals(MergeState.WAITING_FOR_OFFLINE, 
stats.nextMergeState(accumuloClient, state));
-
-      // finish the split
-      KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
-      m = TabletColumnFamily.createPrevRowMutation(tablet);
-      TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5"));
-      update(accumuloClient, m);
-      metaDataStateStore
-          .setLocations(Collections.singletonList(new Assignment(tablet, 
state.someTServer, null)));
-
-      stats = scan(state, metaDataStateStore);
-      assertEquals(MergeState.WAITING_FOR_OFFLINE, 
stats.nextMergeState(accumuloClient, state));
-
-      // take it offline
-      m = TabletColumnFamily.createPrevRowMutation(tablet);
-      List<LogEntry> walogs = Collections.emptyList();
-      metaDataStateStore.unassign(
-          Collections.singletonList(TabletMetadata.builder(tablet)
-              .putLocation(Location.current(state.someTServer))
-              .putHostingGoal(TabletHostingGoal.ALWAYS).build(ColumnType.LAST, 
ColumnType.SUSPEND)),
-          null);
-
-      // now we can split
-      stats = scan(state, metaDataStateStore);
-      assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, 
state));
-    }
-  }
-
-  private MergeStats scan(MockCurrentState state, TabletStateStore 
metaDataStateStore) {
-    MergeStats stats = new MergeStats(state.mergeInfo);
-    stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
-    for (TabletManagement tm : metaDataStateStore) {
-      TabletMetadata tabletMetadata = tm.getTabletMetadata();
-      stats.update(tm.getTabletMetadata().getExtent(),
-          TabletState.compute(tabletMetadata, state.onlineTabletServers()));
-    }
-    return stats;
-  }
-}

Reply via email to