This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 9fa27a6d4d Modify merge and deleterows to use conditional mutations 
(#3875)
9fa27a6d4d is described below

commit 9fa27a6d4d8542a48c2a74849b169269e0a4609c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Oct 24 20:49:29 2023 -0400

    Modify merge and deleterows to use conditional mutations (#3875)
    
    Added a new FATE repo that acquires operations ids on all of the tablets
    in the merge range.  This causes the tablet group watcher to unload
    tablets.  This new repo waits for all tablets to not have a location.
    
    Modified the existing merge and delete rows FATE repos to use ample and
    conditional mutations.  Using ample simplified the code.
    
    Modified the delete rows FATE repo to only delete or fence files, but
    not merge tablets.  Had the deleterows code call the merge code to
    delete tablets.
    
    Removed all merge code from tablet group watcher and related classes
    that was related to unassigning merging tablets.  Setting the operation
    ids takes care of this now.
    
    Modified the final FATE repo to delete operation ids.
    
    MergeIT and DeleteRowsIT are passing with this change
---
 .../core/manager/state/TabletManagement.java       |   2 +-
 .../accumulo/core/metadata/schema/Ample.java       |  43 +-
 .../metadata/schema/TabletMetadataBuilder.java     |   6 +
 .../core/metadata/schema/TabletMutatorBase.java    |  21 +
 .../server/manager/state/CurrentState.java         |   3 -
 .../accumulo/server/manager/state/MergeInfo.java   | 107 -----
 .../accumulo/server/manager/state/MergeState.java  |  41 --
 .../manager/state/TabletManagementIterator.java    |  47 --
 .../metadata/ConditionalTabletsMutatorImpl.java    |   7 +-
 .../server/manager/state/MergeInfoTest.java        | 206 ---------
 .../accumulo/manager/FateServiceHandler.java       |   2 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  96 +---
 .../accumulo/manager/TabletGroupWatcher.java       |  14 +-
 .../manager/tableOps/merge/DeleteRows.java         | 491 +++++----------------
 .../manager/tableOps/merge/FinishTableRangeOp.java |  85 ++--
 .../accumulo/manager/tableOps/merge/MergeInfo.java | 127 ++++++
 .../manager/tableOps/merge/MergeTablets.java       | 371 +++++++---------
 .../manager/tableOps/merge/ReserveTablets.java     | 124 ++++++
 .../manager/tableOps/merge/TableRangeOp.java       |  61 +--
 .../manager/tableOps/merge/WaitForOffline.java     |  81 ----
 .../org/apache/accumulo/test/ComprehensiveIT.java  |   1 -
 .../accumulo/test/functional/DeleteRowsIT.java     |   1 +
 .../functional/TabletManagementIteratorIT.java     |  19 -
 23 files changed, 665 insertions(+), 1291 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
 
b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
index 647f6f9412..495f3e2ef0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
@@ -53,7 +53,7 @@ public class TabletManagement {
   private static final Text EMPTY = new Text("");
 
   public static enum ManagementAction {
-    BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, IS_MERGING, 
NEEDS_SPLITTING;
+    BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, NEEDS_SPLITTING;
   }
 
   public static void addActions(final SortedMap<Key,Value> decodedRow,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 793e5b5d9b..a23e6da08e 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -22,12 +22,14 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -375,6 +377,14 @@ public interface Ample {
     T putSelectedFiles(SelectedFiles selectedFiles);
 
     T deleteSelectedFiles();
+
+    /**
+     * Deletes all the columns in the keys.
+     *
+     * @throws IllegalArgumentException if rows in keys do not match tablet 
row or column visibility
+     *         is not empty
+     */
+    T deleteAll(Set<Key> keys);
   }
 
   interface TabletMutator extends TabletUpdates<TabletMutator> {
@@ -434,7 +444,33 @@ public interface Ample {
   /**
    * Convenience interface for handling conditional mutations with a status of 
REJECTED.
    */
-  interface RejectionHandler extends Predicate<TabletMetadata> {}
+  interface RejectionHandler extends Predicate<TabletMetadata> {
+
+    /**
+     * @return true if the handler should be called when a tablet no longer 
exists
+     */
+    default boolean callWhenTabletDoesNotExists() {
+      return false;
+    }
+
+    /**
+     * @return a RejectionHandler that considers that case where the tablet no 
longer exists as
+     *         accepted.
+     */
+    static RejectionHandler acceptAbsentTablet() {
+      return new Ample.RejectionHandler() {
+        @Override
+        public boolean callWhenTabletDoesNotExists() {
+          return true;
+        }
+
+        @Override
+        public boolean test(TabletMetadata tabletMetadata) {
+          return tabletMetadata == null;
+        }
+      };
+    }
+  }
 
   interface ConditionalTabletMutator extends 
TabletUpdates<ConditionalTabletMutator> {
 
@@ -548,8 +584,9 @@ public interface Ample {
      *        {@link 
org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED} in the
      *        return of {@link ConditionalTabletsMutator#process()}. The 
rejection handler is only
      *        called when a tablets metadata exists. If ample reads a tablet's 
metadata and the
-     *        tablet no longer exists, then ample will not call the 
rejectionHandler with null. It
-     *        will let the rejected status carry forward in this case.
+     *        tablet no longer exists, then ample will not call the 
rejectionHandler with null
+     *        (unless {@link RejectionHandler#callWhenTabletDoesNotExists()} 
returns true). It will
+     *        let the rejected status carry forward in this case.
      */
     void submit(RejectionHandler rejectionHandler);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
index 7df5c34f26..c393a552b4 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
@@ -38,6 +38,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -267,6 +268,11 @@ public class TabletMetadataBuilder implements 
Ample.TabletUpdates<TabletMetadata
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public TabletMetadataBuilder deleteAll(Set<Key> keys) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * @param extraFetched Anything that was put on the builder will 
automatically be added to the
    *        fetched set. However, for the case where something was not put and 
it needs to be
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
index 84b10fe3b5..6fe4ee9c90 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
@@ -18,8 +18,13 @@
  */
 package org.apache.accumulo.core.metadata.schema;
 
+import java.util.Set;
+
 import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -308,6 +313,22 @@ public abstract class TabletMutatorBase<T extends 
Ample.TabletUpdates<T>>
     return getThis();
   }
 
+  @Override
+  public T deleteAll(Set<Key> keys) {
+    ByteSequence row = new ArrayByteSequence(mutation.getRow());
+    keys.forEach(key -> {
+      Preconditions.checkArgument(key.getRowData().equals(row), "Unexpected 
row %s %s", row, key);
+      Preconditions.checkArgument(key.getColumnVisibilityData().length() == 0,
+          "Non empty column visibility %s", key);
+    });
+
+    keys.forEach(key -> {
+      mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+    });
+
+    return getThis();
+  }
+
   public void setCloseAfterMutate(AutoCloseable closeable) {
     this.closeAfterMutate = closeable;
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
index 0e1bc45f15..09caba293e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.server.manager.state;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -37,8 +36,6 @@ public interface CurrentState {
 
   Set<TServerInstance> shutdownServers();
 
-  Collection<MergeInfo> merges();
-
   /**
    * Provide an immutable snapshot view of migrating tablets. Objects 
contained in the set may still
    * be mutable.
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java
deleted file mode 100644
index 510dc5fbff..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.manager.state;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Information about the current merge/rangeDelete.
- *
- * Writable to serialize for zookeeper and the Tablet
- */
-public class MergeInfo implements Writable {
-
-  public enum Operation {
-    MERGE, DELETE,
-  }
-
-  MergeState state = MergeState.NONE;
-  KeyExtent extent;
-  Operation operation = Operation.MERGE;
-
-  public MergeInfo() {}
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    extent = KeyExtent.readFrom(in);
-    state = MergeState.values()[in.readInt()];
-    operation = Operation.values()[in.readInt()];
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    extent.writeTo(out);
-    out.writeInt(state.ordinal());
-    out.writeInt(operation.ordinal());
-  }
-
-  public MergeInfo(KeyExtent range, Operation op) {
-    this.extent = range;
-    this.operation = op;
-  }
-
-  public MergeState getState() {
-    return state;
-  }
-
-  public KeyExtent getExtent() {
-    return extent;
-  }
-
-  public Operation getOperation() {
-    return operation;
-  }
-
-  public void setState(MergeState state) {
-    this.state = state;
-  }
-
-  public boolean isDelete() {
-    return this.operation.equals(Operation.DELETE);
-  }
-
-  public boolean needsToBeChopped(KeyExtent otherExtent) {
-    if (isDelete() && otherExtent.tableId().equals(extent.tableId())) {
-      return otherExtent.prevEndRow() != null && 
otherExtent.prevEndRow().equals(extent.endRow());
-    } else {
-      return false;
-    }
-  }
-
-  public boolean overlaps(KeyExtent otherExtent) {
-    boolean result = this.extent.overlaps(otherExtent);
-    if (!result && needsToBeChopped(otherExtent)) {
-      return true;
-    }
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    if (!state.equals(MergeState.NONE)) {
-      return "Merge " + operation + " of " + extent + " State: " + state;
-    }
-    return "No Merge in progress";
-  }
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
deleted file mode 100644
index 9bf6616bdf..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.manager.state;
-
-public enum MergeState {
-  /**
-   * Not merging
-   */
-  NONE,
-  /**
-   * when the number of chopped tablets in the range matches the number of 
online tablets in the
-   * range, take the tablets offline
-   */
-  WAITING_FOR_OFFLINE,
-  /**
-   * when the number of chopped, offline tablets equals the number of merge 
tablets, begin the
-   * metadata updates
-   */
-  MERGING,
-  /**
-   * merge is complete, the resulting tablet can be brought online, remove the 
marker in zookeeper
-   */
-  COMPLETE
-
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index e0787dbee1..41646f4145 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -95,7 +95,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
 
   private static final String SERVERS_OPTION = "servers";
   private static final String TABLES_OPTION = "tables";
-  private static final String MERGES_OPTION = "merges";
   private static final String MIGRATIONS_OPTION = "migrations";
   private static final String MANAGER_STATE_OPTION = "managerState";
   private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
@@ -122,23 +121,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
     }
   }
 
-  private static void setMerges(final IteratorSetting cfg, final 
Collection<MergeInfo> merges) {
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    try {
-      for (MergeInfo info : merges) {
-        KeyExtent extent = info.getExtent();
-        if (extent != null && !info.getState().equals(MergeState.NONE)) {
-          info.write(buffer);
-        }
-      }
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    String encoded =
-        Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), 
buffer.getLength()));
-    cfg.addOption(MERGES_OPTION, encoded);
-  }
-
   private static void setMigrations(final IteratorSetting cfg,
       final Collection<KeyExtent> migrations) {
     DataOutputBuffer buffer = new DataOutputBuffer();
@@ -242,25 +224,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
     return result;
   }
 
-  private static Map<TableId,MergeInfo> parseMerges(final String merges) {
-    Map<TableId,MergeInfo> result = new HashMap<>();
-    if (merges != null) {
-      try {
-        DataInputBuffer buffer = new DataInputBuffer();
-        byte[] data = Base64.getDecoder().decode(merges);
-        buffer.reset(data, data.length);
-        while (buffer.available() > 0) {
-          MergeInfo mergeInfo = new MergeInfo();
-          mergeInfo.readFields(buffer);
-          result.put(mergeInfo.extent.tableId(), mergeInfo);
-        }
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-    return result;
-  }
-
   private static Map<Long,Map<String,String>> parseCompactionHints(String 
json) {
     if (json == null) {
       return Map.of();
@@ -343,7 +306,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
     if (state != null) {
       TabletManagementIterator.setCurrentServers(tabletChange, 
state.onlineTabletServers());
       TabletManagementIterator.setOnlineTables(tabletChange, 
state.onlineTables());
-      TabletManagementIterator.setMerges(tabletChange, state.merges());
       TabletManagementIterator.setMigrations(tabletChange, 
state.migrationsSnapshot());
       TabletManagementIterator.setManagerState(tabletChange, 
state.getManagerState());
       TabletManagementIterator.setShuttingDown(tabletChange, 
state.shutdownServers());
@@ -361,7 +323,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
   private final Set<TServerInstance> current = new HashSet<>();
   private final Set<TableId> onlineTables = new HashSet<>();
   private final Map<TabletServerId,String> tserverResourceGroups = new 
HashMap<>();
-  private final Map<TableId,MergeInfo> merges = new HashMap<>();
   private final Set<KeyExtent> migrations = new HashSet<>();
   private ManagerState managerState = ManagerState.NORMAL;
   private IteratorEnvironment env;
@@ -376,7 +337,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
     current.addAll(parseServers(options.get(SERVERS_OPTION)));
     onlineTables.addAll(parseTableIDs(options.get(TABLES_OPTION)));
     tserverResourceGroups.putAll(parseTServerResourceGroups(options));
-    merges.putAll(parseMerges(options.get(MERGES_OPTION)));
     migrations.addAll(parseMigrations(options.get(MIGRATIONS_OPTION)));
     String managerStateOptionValue = options.get(MANAGER_STATE_OPTION);
     try {
@@ -480,13 +440,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
       return;
     }
 
-    // we always want data about merges
-    final MergeInfo merge = merges.get(tm.getTableId());
-    if (merge != null) {
-      // could make this smarter by only returning if the tablet is involved 
in the merge
-      reasonsToReturnThisTablet.add(ManagementAction.IS_MERGING);
-    }
-
     if (shouldReturnDueToLocation(tm, onlineTables, current)) {
       reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index cfcc1fec49..71cd17c380 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -231,7 +231,12 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
             var status = _getStatus();
             if (status == Status.REJECTED && 
rejectedHandlers.containsKey(extent)) {
               var tabletMetadata = readMetadata();
-              if (tabletMetadata != null && 
rejectedHandlers.get(extent).test(tabletMetadata)) {
+              var handler = rejectedHandlers.get(extent);
+              if (tabletMetadata == null && 
handler.callWhenTabletDoesNotExists()
+                  && handler.test(null)) {
+                return Status.ACCEPTED;
+              }
+              if (tabletMetadata != null && handler.test(tabletMetadata)) {
                 return Status.ACCEPTED;
               }
             }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
deleted file mode 100644
index 2d7a3a7927..0000000000
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.manager.state;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-public class MergeInfoTest {
-
-  private KeyExtent keyExtent;
-  private MergeInfo mi;
-
-  @BeforeEach
-  public void setUp() {
-    keyExtent = createMock(KeyExtent.class);
-  }
-
-  @Test
-  public void testConstruction_NoArgs() {
-    mi = new MergeInfo();
-    assertEquals(MergeState.NONE, mi.getState());
-    assertNull(mi.getExtent());
-    assertEquals(MergeInfo.Operation.MERGE, mi.getOperation());
-    assertFalse(mi.isDelete());
-  }
-
-  @Test
-  public void testConstruction_2Args() {
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE);
-    assertEquals(MergeState.NONE, mi.getState());
-    assertSame(keyExtent, mi.getExtent());
-    assertEquals(MergeInfo.Operation.DELETE, mi.getOperation());
-    assertTrue(mi.isDelete());
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    String table = "table";
-    Text endRow = new Text("end");
-    Text prevEndRow = new Text("begin");
-    keyExtent = new KeyExtent(TableId.of(table), endRow, prevEndRow);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE);
-    mi.setState(MergeState.WAITING_FOR_OFFLINE);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    mi.write(dos);
-    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-    DataInputStream dis = new DataInputStream(bais);
-    mi = new MergeInfo();
-    mi.readFields(dis);
-    assertSame(MergeState.WAITING_FOR_OFFLINE, mi.getState());
-    assertEquals(keyExtent, mi.getExtent());
-    assertSame(MergeInfo.Operation.DELETE, mi.getOperation());
-  }
-
-  @Test
-  public void testNeedsToBeChopped_DifferentTables() {
-    expect(keyExtent.tableId()).andReturn(TableId.of("table1"));
-    replay(keyExtent);
-    KeyExtent keyExtent2 = createMock(KeyExtent.class);
-    expect(keyExtent2.tableId()).andReturn(TableId.of("table2"));
-    replay(keyExtent2);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.MERGE);
-    assertFalse(mi.needsToBeChopped(keyExtent2));
-  }
-
-  @Test
-  public void testNeedsToBeChopped_Delete_NotFollowing() {
-    testNeedsToBeChopped_Delete("somerow", false);
-  }
-
-  @Test
-  public void testNeedsToBeChopped_Delete_Following() {
-    testNeedsToBeChopped_Delete("prev", true);
-  }
-
-  @Test
-  public void testNeedsToBeChopped_Delete_NoPrevEndRow() {
-    testNeedsToBeChopped_Delete(null, false);
-  }
-
-  private void testNeedsToBeChopped_Delete(String prevEndRow, boolean 
expected) {
-    expect(keyExtent.tableId()).andReturn(TableId.of("table1"));
-    expect(keyExtent.endRow()).andReturn(new Text("prev"));
-    replay(keyExtent);
-    KeyExtent keyExtent2 = createMock(KeyExtent.class);
-    expect(keyExtent2.tableId()).andReturn(TableId.of("table1"));
-    expect(keyExtent2.prevEndRow()).andReturn(prevEndRow != null ? new 
Text(prevEndRow) : null);
-    expectLastCall().anyTimes();
-    replay(keyExtent2);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE);
-    assertEquals(expected, mi.needsToBeChopped(keyExtent2));
-  }
-
-  @Test
-  public void testOverlaps_ExtentsOverlap() {
-    KeyExtent keyExtent2 = createMock(KeyExtent.class);
-    expect(keyExtent.overlaps(keyExtent2)).andReturn(true);
-    replay(keyExtent);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.MERGE);
-    assertTrue(mi.overlaps(keyExtent2));
-  }
-
-  @Test
-  public void testOverlaps_DoesNotNeedChopping() {
-    KeyExtent keyExtent2 = createMock(KeyExtent.class);
-    expect(keyExtent.overlaps(keyExtent2)).andReturn(false);
-    expect(keyExtent.tableId()).andReturn(TableId.of("table1"));
-    replay(keyExtent);
-    expect(keyExtent2.tableId()).andReturn(TableId.of("table2"));
-    replay(keyExtent2);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.MERGE);
-    assertFalse(mi.overlaps(keyExtent2));
-  }
-
-  @Test
-  public void testOverlaps_NeedsChopping() {
-    KeyExtent keyExtent2 = createMock(KeyExtent.class);
-    expect(keyExtent.overlaps(keyExtent2)).andReturn(false);
-    expect(keyExtent.tableId()).andReturn(TableId.of("table1"));
-    expect(keyExtent.endRow()).andReturn(new Text("prev"));
-    replay(keyExtent);
-    expect(keyExtent2.tableId()).andReturn(TableId.of("table1"));
-    expect(keyExtent2.prevEndRow()).andReturn(new Text("prev"));
-    expectLastCall().anyTimes();
-    replay(keyExtent2);
-    mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE);
-    assertTrue(mi.overlaps(keyExtent2));
-  }
-
-  private static MergeInfo readWrite(MergeInfo info) throws Exception {
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    info.write(buffer);
-    DataInputBuffer in = new DataInputBuffer();
-    in.reset(buffer.getData(), 0, buffer.getLength());
-    MergeInfo info2 = new MergeInfo();
-    info2.readFields(in);
-    assertEquals(info.getExtent(), info2.getExtent());
-    assertEquals(info.getState(), info2.getState());
-    assertEquals(info.getOperation(), info2.getOperation());
-    return info2;
-  }
-
-  private static KeyExtent ke(String tableId, String endRow, String 
prevEndRow) {
-    return new KeyExtent(TableId.of(tableId), endRow == null ? null : new 
Text(endRow),
-        prevEndRow == null ? null : new Text(prevEndRow));
-  }
-
-  @Test
-  public void testWritable() throws Exception {
-    MergeInfo info;
-    info = readWrite(new MergeInfo(ke("a", null, "b"), 
MergeInfo.Operation.MERGE));
-    info = readWrite(new MergeInfo(ke("a", "b", null), 
MergeInfo.Operation.MERGE));
-    info = readWrite(new MergeInfo(ke("x", "b", "a"), 
MergeInfo.Operation.MERGE));
-    info = readWrite(new MergeInfo(ke("x", "b", "a"), 
MergeInfo.Operation.DELETE));
-    assertTrue(info.isDelete());
-    info.setState(MergeState.COMPLETE);
-  }
-
-  @Test
-  public void testNeedsToBeChopped() {
-    MergeInfo info = new MergeInfo(ke("x", "b", "a"), 
MergeInfo.Operation.DELETE);
-    assertTrue(info.needsToBeChopped(ke("x", "c", "b")));
-    assertTrue(info.overlaps(ke("x", "c", "b")));
-    assertFalse(info.needsToBeChopped(ke("y", "c", "b")));
-    assertFalse(info.needsToBeChopped(ke("x", "c", "bb")));
-    assertFalse(info.needsToBeChopped(ke("x", "b", "a")));
-  }
-
-}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index 8c9d217cd8..e94e4907bc 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -89,6 +89,7 @@ import 
org.apache.accumulo.manager.tableOps.compact.cancel.CancelCompactions;
 import org.apache.accumulo.manager.tableOps.create.CreateTable;
 import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable;
 import org.apache.accumulo.manager.tableOps.goal.SetHostingGoal;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
 import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
 import org.apache.accumulo.manager.tableOps.namespace.create.CreateNamespace;
 import org.apache.accumulo.manager.tableOps.namespace.delete.DeleteNamespace;
@@ -98,7 +99,6 @@ import org.apache.accumulo.manager.tableOps.split.PreSplit;
 import org.apache.accumulo.manager.tableOps.tableExport.ExportTable;
 import org.apache.accumulo.manager.tableOps.tableImport.ImportTable;
 import org.apache.accumulo.server.client.ClientServiceHandler;
-import org.apache.accumulo.server.manager.state.MergeInfo;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 60b2115d6e..5257a610a3 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -74,9 +74,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AgeOffStore;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
@@ -135,8 +133,6 @@ import 
org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.apache.accumulo.server.manager.state.CurrentState;
 import org.apache.accumulo.server.manager.state.DeadServerList;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
 import org.apache.accumulo.server.manager.state.TabletServerState;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
 import org.apache.accumulo.server.manager.state.UnassignedTablet;
@@ -152,8 +148,6 @@ import org.apache.accumulo.server.tables.TableObserver;
 import org.apache.accumulo.server.util.ScanServerMetadataEntries;
 import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.TableInfoUtil;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
@@ -203,7 +197,6 @@ public class Manager extends AbstractServer
   final SortedMap<KeyExtent,TServerInstance> migrations =
       Collections.synchronizedSortedMap(new TreeMap<>());
   final EventCoordinator nextEvent = new EventCoordinator();
-  private final Object mergeLock = new Object();
   RecoveryManager recoveryManager = null;
   private final ManagerTime timeKeeper;
 
@@ -498,65 +491,6 @@ public class Manager extends AbstractServer
     return tserverSet.getConnection(server);
   }
 
-  public MergeInfo getMergeInfo(TableId tableId) {
-    ServerContext context = getContext();
-    synchronized (mergeLock) {
-      try {
-        String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + 
"/merge";
-        if (!context.getZooReaderWriter().exists(path)) {
-          return new MergeInfo();
-        }
-        byte[] data = context.getZooReaderWriter().getData(path);
-        DataInputBuffer in = new DataInputBuffer();
-        in.reset(data, data.length);
-        MergeInfo info = new MergeInfo();
-        info.readFields(in);
-        return info;
-      } catch (KeeperException.NoNodeException ex) {
-        log.info("Error reading merge state, it probably just finished");
-        return new MergeInfo();
-      } catch (Exception ex) {
-        log.warn("Unexpected error reading merge state", ex);
-        return new MergeInfo();
-      }
-    }
-  }
-
-  public void setMergeState(MergeInfo info, MergeState state)
-      throws KeeperException, InterruptedException {
-    ServerContext context = getContext();
-    synchronized (mergeLock) {
-      String path =
-          getZooKeeperRoot() + Constants.ZTABLES + "/" + 
info.getExtent().tableId() + "/merge";
-      info.setState(state);
-      if (state.equals(MergeState.NONE)) {
-        context.getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
-      } else {
-        DataOutputBuffer out = new DataOutputBuffer();
-        try {
-          info.write(out);
-        } catch (IOException ex) {
-          throw new AssertionError("Unlikely", ex);
-        }
-        context.getZooReaderWriter().putPersistentData(path, out.getData(),
-            state.equals(MergeState.WAITING_FOR_OFFLINE) ? 
ZooUtil.NodeExistsPolicy.FAIL
-                : ZooUtil.NodeExistsPolicy.OVERWRITE);
-      }
-      mergeLock.notifyAll();
-    }
-    nextEvent.event(info.getExtent().tableId(), "Merge state of %s set to %s", 
info.getExtent(),
-        state);
-  }
-
-  public void clearMergeState(TableId tableId) throws KeeperException, 
InterruptedException {
-    synchronized (mergeLock) {
-      String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + 
"/merge";
-      getContext().getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
-      mergeLock.notifyAll();
-    }
-    nextEvent.event(tableId, "Merge state of %s cleared", tableId);
-  }
-
   void setManagerGoalState(ManagerGoalState state) {
     try {
       getContext().getZooReaderWriter().putPersistentData(
@@ -681,7 +615,7 @@ public class Manager extends AbstractServer
     }
   }
 
-  TabletGoalState getGoalState(TabletMetadata tm, MergeInfo mergeInfo) {
+  TabletGoalState getGoalState(TabletMetadata tm) {
     KeyExtent extent = tm.getExtent();
     // Shutting down?
     TabletGoalState state = getSystemGoalState(tm);
@@ -700,25 +634,6 @@ public class Manager extends AbstractServer
       if (tm.hasCurrent() && 
serversToShutdown.contains(tm.getLocation().getServerInstance())) {
         return TabletGoalState.SUSPENDED;
       }
-      // Handle merge transitions
-      if (mergeInfo.getExtent() != null) {
-
-        final boolean overlaps = mergeInfo.overlaps(extent);
-
-        if (overlaps) {
-          log.debug("mergeInfo overlaps: {} true {}", extent, 
mergeInfo.getState());
-          switch (mergeInfo.getState()) {
-            case NONE:
-            case COMPLETE:
-              break;
-            case WAITING_FOR_OFFLINE:
-            case MERGING:
-              return TabletGoalState.UNASSIGNED;
-          }
-        } else {
-          log.trace("mergeInfo overlaps: {} false", extent);
-        }
-      }
 
       // taking table offline?
       state = getTableGoalState(tm);
@@ -1699,15 +1614,6 @@ public class Manager extends AbstractServer
     return tserverSet.getCurrentServersGroups();
   }
 
-  @Override
-  public Collection<MergeInfo> merges() {
-    List<MergeInfo> result = new ArrayList<>();
-    for (TableId tableId : getContext().getTableIdToNameMap().keySet()) {
-      result.add(getMergeInfo(tableId));
-    }
-    return result;
-  }
-
   // recovers state from the persistent transaction to shutdown a server
   public void shutdownTServer(TServerInstance server) {
     nextEvent.event("Tablet Server shutdown requested for %s", server);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index dece4f4497..cdb6953e8a 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -85,7 +85,6 @@ import 
org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.ClosableIterator;
 import org.apache.accumulo.server.manager.state.DistributedStoreException;
-import org.apache.accumulo.server.manager.state.MergeInfo;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
 import org.apache.accumulo.server.manager.state.UnassignedTablet;
@@ -313,13 +312,6 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
     int unloaded = 0;
 
-    Map<TableId,MergeInfo> currentMerges = new HashMap<>();
-    for (MergeInfo merge : manager.merges()) {
-      if (merge.getExtent() != null) {
-        currentMerges.put(merge.getExtent().tableId(), merge);
-      }
-    }
-
     final Map<String,Set<TServerInstance>> currentTServerGrouping =
         manager.tserverSet.getCurrentServersGroups();
 
@@ -376,8 +368,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
       final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 
-      TabletGoalState goal = manager.getGoalState(tm,
-          currentMerges.computeIfAbsent(tm.getTableId(), k -> new 
MergeInfo()));
+      TabletGoalState goal = manager.getGoalState(tm);
       TabletState state =
           TabletState.compute(tm, currentTServers.keySet(), 
manager.tabletBalancer, resourceGroups);
 
@@ -480,8 +471,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       // entries from the queue because we see nothing here for that case. 
After a full
       // metadata scan could remove any tablets that were not updated during 
the scan.
 
-      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
-          || actions.contains(ManagementAction.IS_MERGING)) {
+      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)) {
         if (goal == TabletGoalState.HOSTED) {
           if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty())
               && manager.recoveryManager.recoverLogs(tm.getExtent(), 
tm.getLogs())) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
index c6614cae5f..b67417f321 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
@@ -18,57 +18,38 @@
  */
 package org.apache.accumulo.manager.tableOps.merge;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static 
org.apache.accumulo.manager.tableOps.merge.MergeTablets.validateTablet;
+
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
+
 import org.apache.accumulo.core.client.admin.TabletHostingGoal;
-import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.gc.ReferenceFile;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.gc.AllVolumesDirectory;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,194 +64,58 @@ public class DeleteRows extends ManagerRepo {
 
   private static final Logger log = LoggerFactory.getLogger(DeleteRows.class);
 
-  private final NamespaceId namespaceId;
-  private final TableId tableId;
+  private final MergeInfo data;
 
-  public DeleteRows(NamespaceId namespaceId, TableId tableId) {
-    this.namespaceId = namespaceId;
-    this.tableId = tableId;
+  public DeleteRows(MergeInfo data) {
+    Preconditions.checkArgument(data.op == MergeInfo.Operation.DELETE);
+    this.data = data;
   }
 
   @Override
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
-    MergeInfo mergeInfo = manager.getMergeInfo(tableId);
-    Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING);
-    Preconditions.checkState(mergeInfo.isDelete());
+    // delete or fence files within the deletion range
+    var mergeRange = deleteTabletFiles(manager, tid);
 
-    deleteTablets(manager, mergeInfo);
+    // merge away empty tablets in the deletion range
+    return new MergeTablets(mergeRange.map(mre -> 
data.useMergeRange(mre)).orElse(data));
+  }
 
-    manager.setMergeState(mergeInfo, MergeState.COMPLETE);
+  private Optional<KeyExtent> deleteTabletFiles(Manager manager, long tid) {
+    // Only delete data within the original extent specified by the user
+    KeyExtent range = data.getOriginalExtent();
+    var fateStr = FateTxId.formatTid(tid);
+    log.debug("{} deleting tablet files in range {}", fateStr, range);
+    var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
 
-    // TODO namespace id
-    return new FinishTableRangeOp(namespaceId, tableId);
-  }
+    try (
+        var tabletsMetadata = manager.getContext().getAmple().readTablets()
+            .forTable(range.tableId()).overlapping(range.prevEndRow(), 
range.endRow())
+            .fetch(OPID, LOCATION, FILES, PREV_ROW).checkConsistency().build();
+        var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
 
-  private void deleteTablets(Manager manager, MergeInfo info) throws 
AccumuloException {
-    // Before updated metadata and get the first and last tablets which
-    // are fenced if necessary
-    final Pair<KeyExtent,KeyExtent> firstAndLastTablets =
-        updateMetadataRecordsForDelete(manager, info);
-
-    // Find the deletion start row (exclusive) for tablets that need to be 
actually deleted
-    // This will be null if deleting everything up until the end row or it 
will be
-    // the endRow of the first tablet as the first tablet should be kept and 
will have
-    // already been fenced if necessary
-    final Text deletionStartRow = 
getDeletionStartRow(firstAndLastTablets.getFirst());
-
-    // Find the deletion end row (inclusive) for tablets that need to be 
actually deleted
-    // This will be null if deleting everything after the starting row or it 
will be
-    // the prevEndRow of the last tablet as the last tablet should be kept and 
will have
-    // already been fenced if necessary
-    Text deletionEndRow = getDeletionEndRow(firstAndLastTablets.getSecond());
-
-    // check if there are any tablets to delete and if not return
-    if (!hasTabletsToDelete(firstAndLastTablets.getFirst(), 
firstAndLastTablets.getSecond())) {
-      log.trace("No tablets to delete for range {}, returning", 
info.getExtent());
-      return;
-    }
+      KeyExtent firstCompleteContained = null;
+      KeyExtent lastCompletelyContained = null;
 
-    // Build an extent for the actual deletion range
-    final KeyExtent extent =
-        new KeyExtent(info.getExtent().tableId(), deletionEndRow, 
deletionStartRow);
-    log.debug("Tablet deletion range is {}", extent);
-    String targetSystemTable = extent.isMeta() ? RootTable.NAME : 
MetadataTable.NAME;
-    log.debug("Deleting tablets for {}", extent);
-    MetadataTime metadataTime = null;
-    KeyExtent followingTablet = null;
-    Set<TabletHostingGoal> goals = new HashSet<>();
-    if (extent.endRow() != null) {
-      Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW);
-      followingTablet = getHighTablet(manager,
-          new KeyExtent(extent.tableId(), nextExtent.getRow(), 
extent.endRow()));
-      log.debug("Found following tablet {}", followingTablet);
-    }
-    try {
-      AccumuloClient client = manager.getContext();
-      ServerContext context = manager.getContext();
-      Ample ample = context.getAmple();
-      Text start = extent.prevEndRow();
-      if (start == null) {
-        start = new Text();
-      }
-      log.debug("Making file deletion entries for {}", extent);
-      Range deleteRange =
-          new Range(MetadataSchema.TabletsSection.encodeRow(extent.tableId(), 
start), false,
-              MetadataSchema.TabletsSection.encodeRow(extent.tableId(), 
extent.endRow()), true);
-      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
-      scanner.setRange(deleteRange);
-      
MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
-      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
-      Set<ReferenceFile> datafilesAndDirs = new TreeSet<>();
-      for (Map.Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        if 
(key.compareColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)
 == 0) {
-          var stf = new 
StoredTabletFile(key.getColumnQualifierData().toString());
-          datafilesAndDirs.add(new ReferenceFile(stf.getTableId(), stf));
-          if (datafilesAndDirs.size() > 1000) {
-            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
-            datafilesAndDirs.clear();
-          }
-        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          metadataTime = MetadataTime.parse(entry.getValue().toString());
-        } else if (key.compareColumnFamily(
-            MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) == 
0) {
-          throw new IllegalStateException(
-              "Tablet " + key.getRow() + " is assigned during a merge!");
-        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
-            .hasColumns(key)) {
-          var allVolumesDirectory =
-              new AllVolumesDirectory(extent.tableId(), 
entry.getValue().toString());
-          datafilesAndDirs.add(allVolumesDirectory);
-          if (datafilesAndDirs.size() > 1000) {
-            ample.putGcFileAndDirCandidates(extent.tableId(), 
datafilesAndDirs);
-            datafilesAndDirs.clear();
-          }
-        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) 
{
-          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
-          goals.add(thisGoal);
-        }
-      }
-      ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs);
-      BatchWriter bw = client.createBatchWriter(targetSystemTable);
-      try {
-        deleteTablets(info, deleteRange, bw, client);
-      } finally {
-        bw.close();
-      }
+      for (var tabletMetadata : tabletsMetadata) {
+        validateTablet(tabletMetadata, fateStr, opid, data.tableId);
 
-      if (followingTablet != null) {
-        log.debug("Updating prevRow of {} to {}", followingTablet, 
extent.prevEndRow());
-        bw = client.createBatchWriter(targetSystemTable);
-        try {
-          Mutation m = new Mutation(followingTablet.toMetaRow());
-          
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m,
-              MetadataSchema.TabletsSection.TabletColumnFamily
-                  .encodePrevEndRow(extent.prevEndRow()));
-          bw.addMutation(m);
-          bw.flush();
-        } finally {
-          bw.close();
-        }
-      } else {
-        // Recreate the default tablet to hold the end of the table
-        MetadataTableUtil.addTablet(new KeyExtent(extent.tableId(), null, 
extent.prevEndRow()),
-            
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME,
-            manager.getContext(), metadataTime.getType(), 
manager.getManagerLock(),
-            getMergeHostingGoal(extent, goals));
-      }
-    } catch (RuntimeException | TableNotFoundException ex) {
-      throw new AccumuloException(ex);
-    }
-  }
+        var tabletMutator = 
tabletsMutator.mutateTablet(tabletMetadata.getExtent())
+            .requireOperation(opid).requireAbsentLocation();
 
-  private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(Manager 
manager, MergeInfo info)
-      throws AccumuloException {
-    final KeyExtent range = info.getExtent();
+        Set<StoredTabletFile> filesToDelete = new HashSet<>();
+        Map<StoredTabletFile,DataFileValue> filesToAddMap = new HashMap<>();
 
-    String targetSystemTable = MetadataTable.NAME;
-    if (range.isMeta()) {
-      targetSystemTable = RootTable.NAME;
-    }
-    final Pair<KeyExtent,KeyExtent> startAndEndTablets;
-
-    final AccumuloClient client = manager.getContext();
-
-    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
-      final Text startRow = range.prevEndRow();
-      final Text endRow = range.endRow() != null
-          ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : 
null;
-
-      // Find the tablets that overlap the start and end row of the deletion 
range
-      // If the startRow is null then there will be an empty startTablet we 
don't need
-      // to fence a starting tablet as we are deleting everything up to the 
end tablet
-      // Likewise, if the endRow is null there will be an empty endTablet as 
we are deleting
-      // all tablets after the starting tablet
-      final Optional<TabletMetadata> startTablet =
-          Optional.ofNullable(startRow).flatMap(row -> 
loadTabletMetadata(manager, range.tableId(),
-              row, TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.FILES));
-      final Optional<TabletMetadata> endTablet =
-          Optional.ofNullable(endRow).flatMap(row -> 
loadTabletMetadata(manager, range.tableId(),
-              row, TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.FILES));
-
-      // Store the tablets in a Map if present so that if we have the same 
Tablet we
-      // only need to process the same tablet once when fencing
-      final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new 
TreeMap<>();
-      startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft));
-      endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), 
lt));
-
-      // Capture the tablets to return them or null if not loaded
-      startAndEndTablets = new 
Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null),
-          endTablet.map(TabletMetadata::getExtent).orElse(null));
-
-      for (TabletMetadata tabletMetadata : tabletMetadatas.values()) {
-        final KeyExtent keyExtent = tabletMetadata.getExtent();
-
-        // Check if this tablet needs to have its files fenced for the deletion
-        if (needsFencingForDeletion(info, keyExtent)) {
-          log.debug("Found overlapping keyExtent {} for delete, fencing 
files.", keyExtent);
+        if (range.contains(tabletMetadata.getExtent())) {
+          if (firstCompleteContained == null) {
+            firstCompleteContained = tabletMetadata.getExtent();
+          }
+          lastCompletelyContained = tabletMetadata.getExtent();
+          // delete range completely contains tablet, so want to delete all 
the tablets files
+          filesToDelete.addAll(tabletMetadata.getFiles());
+        } else {
+          Preconditions.checkState(range.overlaps(tabletMetadata.getExtent()),
+              "%s tablet %s does not overlap delete range %s", fateStr, 
tabletMetadata.getExtent(),
+              range);
 
           // Create the ranges for fencing the files, this takes the place of
           // chop compactions and splits
@@ -284,8 +129,6 @@ public class DeleteRows extends ManagerRepo {
             final StoredTabletFile existing = entry.getKey();
             final DataFileValue value = entry.getValue();
 
-            final Mutation m = new Mutation(keyExtent.toMetaRow());
-
             // Go through each range that was created and modify the metadata 
for the file
             // The end row should be inclusive for the current tablet and the 
previous end row
             // should be exclusive for the start row.
@@ -301,7 +144,7 @@ public class DeleteRows extends ManagerRepo {
               // of the newFiles set which means it is disjoint with all ranges
               if (fenced != null) {
                 final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
-                log.trace("Adding new file {} with range {}", 
newFile.getMetadataPath(),
+                log.trace("{} Adding new file {} with range {}", fateStr, 
newFile.getMetadataPath(),
                     newFile.getRange());
 
                 // Add the new file to the newFiles set, it will be added 
later if it doesn't match
@@ -312,15 +155,13 @@ public class DeleteRows extends ManagerRepo {
                 // with all ranges.
                 newFiles.add(newFile);
               } else {
-                log.trace("Found a disjoint file {} with  range {} on delete",
+                log.trace("{} Found a disjoint file {} with  range {} on 
delete", fateStr,
                     existing.getMetadataPath(), existing.getRange());
               }
             }
 
             // If the existingFile is not contained in the newFiles set then 
we can delete it
-            Sets.difference(existingFile, newFiles).forEach(
-                delete -> 
m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
-                    existing.getMetadataText()));
+            filesToDelete.addAll(Sets.difference(existingFile, newFiles));
 
             // Add any new files that don't match the existingFile
             // As of now we will only have at most 2 files as up to 2 ranges 
are created
@@ -334,70 +175,70 @@ public class DeleteRows extends ManagerRepo {
               // This splits up the values in half and makes sure they total 
the original
               // values
               final Pair<DataFileValue,DataFileValue> newDfvs = 
computeNewDfv(value);
-              m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
-                  filesToAdd.get(0).getMetadataText(), 
newDfvs.getFirst().encodeAsValue());
-              m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
-                  filesToAdd.get(1).getMetadataText(), 
newDfvs.getSecond().encodeAsValue());
+              filesToAddMap.put(filesToAdd.get(0), newDfvs.getFirst());
+              filesToAddMap.put(filesToAdd.get(1), newDfvs.getSecond());
             } else {
               // Will be 0 or 1 files
-              filesToAdd
-                  .forEach(newFile -> 
m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
-                      newFile.getMetadataText(), value.encodeAsValue()));
-            }
-
-            if (!m.getUpdates().isEmpty()) {
-              bw.addMutation(m);
+              filesToAdd.forEach(newFile -> filesToAddMap.put(newFile, value));
             }
           }
-        } else {
-          log.debug(
-              "Skipping metadata update on file for keyExtent {} for delete as 
not overlapping on rows.",
-              keyExtent);
         }
+
+        filesToDelete.forEach(file -> log.debug("{} deleting file {} for {}", 
fateStr, file,
+            tabletMetadata.getExtent()));
+        filesToAddMap.forEach((file, dfv) -> log.debug("{} adding file {} {} 
for {}", fateStr, file,
+            dfv, tabletMetadata.getExtent()));
+
+        filesToDelete.forEach(tabletMutator::deleteFile);
+        filesToAddMap.forEach(tabletMutator::putFile);
+
+        tabletMutator.submit(tm -> 
tm.getFiles().containsAll(filesToAddMap.keySet())
+            && Collections.disjoint(tm.getFiles(), filesToDelete));
       }
 
-      bw.flush();
+      var results = tabletsMutator.process();
+      verifyAccepted(results, fateStr);
 
-      return startAndEndTablets;
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
+      return computeMergeRange(range, firstCompleteContained, 
lastCompletelyContained);
     }
   }
 
-  // This method finds returns the deletion starting row (exclusive) for 
tablets that
-  // need to be actually deleted. If the startTablet is null then
-  // the deletion start row will just be null as all tablets are being deleted
-  // up to the end. Otherwise, this returns the endRow of the first tablet
-  // as the first tablet should be kept and will have been previously
-  // fenced if necessary
-  private Text getDeletionStartRow(final KeyExtent startTablet) {
-    if (startTablet == null) {
-      log.debug("First tablet for delete range is null");
-      return null;
+  /**
+   * Tablets that are completely contained in the delete range can be merged 
away. Use the first and
+   * last tablet were completely contained in the delete range to create a 
merge range.
+   */
+  private Optional<KeyExtent> computeMergeRange(KeyExtent deleteRange,
+      KeyExtent firstCompleteContained, KeyExtent lastCompletelyContained) {
+    if (deleteRange.prevEndRow() == null && deleteRange.endRow() == null) {
+      return Optional.empty();
     }
 
-    final Text deletionStartRow = startTablet.endRow();
-    log.debug("Start row is {} for deletion", deletionStartRow);
-
-    return deletionStartRow;
-  }
+    if (firstCompleteContained == null) {
+      return Optional.empty();
+    }
 
-  // This method finds returns the deletion ending row (inclusive) for tablets 
that
-  // need to be actually deleted. If the endTablet is null then
-  // the deletion end row will just be null as all tablets are being deleted
-  // after the start row. Otherwise, this returns the prevEndRow of the last 
tablet
-  // as the last tablet should be kept and will have been previously
-  // fenced if necessary
-  private Text getDeletionEndRow(final KeyExtent endTablet) {
-    if (endTablet == null) {
-      log.debug("Last tablet for delete range is null");
-      return null;
+    // Extend the merge range past the end of the last fully contained extent 
to merge that tablet
+    // away.
+    Text end = lastCompletelyContained.endRow();
+    if (end != null) {
+      end = new Key(end).followingKey(PartialKey.ROW).getRow();
     }
 
-    Text deletionEndRow = endTablet.prevEndRow();
-    log.debug("Deletion end row is {}", deletionEndRow);
+    return Optional
+        .of(new KeyExtent(deleteRange.tableId(), end, 
firstCompleteContained.prevEndRow()));
+  }
+
+  static void verifyAccepted(Map<KeyExtent,Ample.ConditionalResult> results, 
String fateStr) {
+    if (results.values().stream()
+        .anyMatch(conditionalResult -> conditionalResult.getStatus() != 
Status.ACCEPTED)) {
+      results.forEach(((extent, conditionalResult) -> {
+        if (conditionalResult.getStatus() != Status.ACCEPTED) {
+          log.error("{} failed to update {}", fateStr, extent);
+        }
+      }));
 
-    return deletionEndRow;
+      throw new IllegalStateException(fateStr + " failed to update tablet 
files");
+    }
   }
 
   static TabletHostingGoal getMergeHostingGoal(KeyExtent range, 
Set<TabletHostingGoal> goals) {
@@ -423,28 +264,6 @@ public class DeleteRows extends ManagerRepo {
     return new Pair<>(file1Value, file2Value);
   }
 
-  private Optional<TabletMetadata> loadTabletMetadata(Manager manager, TableId 
tabletId,
-      final Text row, TabletMetadata.ColumnType... columns) {
-    try (TabletsMetadata tabletsMetadata = 
manager.getContext().getAmple().readTablets()
-        .forTable(tabletId).overlapping(row, true, 
row).fetch(columns).build()) {
-      return tabletsMetadata.stream().findFirst();
-    }
-  }
-
-  // This method is used to detect if a tablet needs to be split/chopped for a 
delete
-  // Instead of performing a split or chop compaction, the tablet will have 
its files fenced.
-  private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) 
{
-    // Does this extent cover the end points of the delete?
-    final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r);
-    final Predicate<Text> isNotBoundary =
-        r -> !r.equals(keyExtent.endRow()) && 
!r.equals(keyExtent.prevEndRow());
-    final KeyExtent deleteRange = info.getExtent();
-
-    return (keyExtent.overlaps(deleteRange) && Stream
-        .of(deleteRange.prevEndRow(), 
deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary)))
-        || info.needsToBeChopped(keyExtent);
-  }
-
   // Instead of splitting or chopping tablets for a delete we instead create 
ranges
   // to exclude the portion of the tablet that should be deleted
   private Text followingRow(Text row) {
@@ -491,110 +310,4 @@ public class DeleteRows extends ManagerRepo {
 
     return ranges;
   }
-
-  private static boolean isFirstTabletInTable(KeyExtent tablet) {
-    return tablet != null && tablet.prevEndRow() == null;
-  }
-
-  private static boolean isLastTabletInTable(KeyExtent tablet) {
-    return tablet != null && tablet.endRow() == null;
-  }
-
-  private static boolean areContiguousTablets(KeyExtent firstTablet, KeyExtent 
lastTablet) {
-    return firstTablet != null && lastTablet != null
-        && Objects.equals(firstTablet.endRow(), lastTablet.prevEndRow());
-  }
-
-  private boolean hasTabletsToDelete(final KeyExtent firstTabletInRange,
-      final KeyExtent lastTableInRange) {
-    // If the tablets are equal (and not null) then the deletion range is just 
part of 1 tablet
-    // which will be fenced so there are no tablets to delete. The null check 
is because if both
-    // are null then we are just deleting everything, so we do have tablets to 
delete
-    if (Objects.equals(firstTabletInRange, lastTableInRange) && 
firstTabletInRange != null) {
-      log.trace(
-          "No tablets to delete, firstTablet {} equals lastTablet {} in 
deletion range and was fenced.",
-          firstTabletInRange, lastTableInRange);
-      return false;
-      // If the lastTablet of the deletion range is the first tablet of the 
table it has been fenced
-      // already so nothing to actually delete before it
-    } else if (isFirstTabletInTable(lastTableInRange)) {
-      log.trace(
-          "No tablets to delete, lastTablet {} in deletion range is the first 
tablet of the table and was fenced.",
-          lastTableInRange);
-      return false;
-      // If the firstTablet of the deletion range is the last tablet of the 
table it has been fenced
-      // already so nothing to actually delete after it
-    } else if (isLastTabletInTable(firstTabletInRange)) {
-      log.trace(
-          "No tablets to delete, firstTablet {} in deletion range is the last 
tablet of the table and was fenced.",
-          firstTabletInRange);
-      return false;
-      // If the firstTablet and lastTablet are contiguous tablets then there 
is nothing to delete as
-      // each will be fenced and nothing between
-    } else if (areContiguousTablets(firstTabletInRange, lastTableInRange)) {
-      log.trace(
-          "No tablets to delete, firstTablet {} and lastTablet {} in deletion 
range are contiguous and were fenced.",
-          firstTabletInRange, lastTableInRange);
-      return false;
-    }
-
-    return true;
-  }
-
-  static void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, 
AccumuloClient client)
-      throws TableNotFoundException, MutationsRejectedException {
-    Scanner scanner;
-    Mutation m;
-    // Delete everything in the other tablets
-    // group all deletes into tablet into one mutation, this makes tablets
-    // either disappear entirely or not all.. this is important for the case
-    // where the process terminates in the loop below...
-    scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME 
: MetadataTable.NAME,
-        Authorizations.EMPTY);
-    log.debug("Deleting range {}", scanRange);
-    scanner.setRange(scanRange);
-    RowIterator rowIter = new RowIterator(scanner);
-    while (rowIter.hasNext()) {
-      Iterator<Map.Entry<Key,Value>> row = rowIter.next();
-      m = null;
-      while (row.hasNext()) {
-        Map.Entry<Key,Value> entry = row.next();
-        Key key = entry.getKey();
-
-        if (m == null) {
-          m = new Mutation(key.getRow());
-        }
-
-        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-        log.debug("deleting entry {}", key);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.flush();
-  }
-
-  static KeyExtent getHighTablet(Manager manager, KeyExtent range) throws 
AccumuloException {
-    try {
-      AccumuloClient client = manager.getContext();
-      Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : 
MetadataTable.NAME,
-          Authorizations.EMPTY);
-      
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null);
-      scanner.setRange(new Range(start.toMetaRow(), null));
-      Iterator<Map.Entry<Key,Value>> iterator = scanner.iterator();
-      if (!iterator.hasNext()) {
-        throw new AccumuloException("No last tablet for a merge " + range);
-      }
-      Map.Entry<Key,Value> entry = iterator.next();
-      KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry);
-      if (!highTablet.tableId().equals(range.tableId())) {
-        throw new AccumuloException("No last tablet for merge " + range + " " 
+ highTablet);
-      }
-      return highTablet;
-    } catch (Exception ex) {
-      throw new AccumuloException("Unexpected failure finding the last tablet 
for a merge " + range,
-          ex);
-    }
-  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
index 574335dcf3..323ca6bc0f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java
@@ -18,54 +18,75 @@
  */
 package org.apache.accumulo.manager.tableOps.merge;
 
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.core.data.TableId;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
-import org.apache.accumulo.server.manager.state.MergeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * ELASTICITY_TODO edit these docs which are pre elasticity changes. Best done 
after #3763
- *
- * Merge makes things hard.
- *
- * Typically, a client will read the list of tablets, and begin an operation 
on that tablet at the
- * location listed in the metadata table. When a tablet splits, the 
information read from the
- * metadata table doesn't match reality, so the operation fails, and must be 
retried. But the
- * operation will take place either on the parent, or at a later time on the 
children. It won't take
- * place on just half of the tablet.
- *
- * However, when a merge occurs, the operation may have succeeded on one 
section of the merged area,
- * and not on the others, when the merge occurs. There is no way to retry the 
request at a later
- * time on an unmodified tablet.
- *
- * The code below uses read-write lock to prevent some operations while a 
merge is taking place.
- * Normal operations, like bulk imports, will grab the read lock and prevent 
merges (writes) while
- * they run. Merge operations will lock out some operations while they run.
- */
+import com.google.common.base.Preconditions;
+
 class FinishTableRangeOp extends ManagerRepo {
   private static final Logger log = 
LoggerFactory.getLogger(FinishTableRangeOp.class);
 
   private static final long serialVersionUID = 1L;
-  private TableId tableId;
-  private NamespaceId namespaceId;
 
-  public FinishTableRangeOp(NamespaceId namespaceId, TableId tableId) {
-    this.tableId = tableId;
-    this.namespaceId = namespaceId;
+  private final MergeInfo data;
+
+  public FinishTableRangeOp(MergeInfo data) {
+    this.data = data;
   }
 
   @Override
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
-    MergeInfo mergeInfo = manager.getMergeInfo(tableId);
-    log.info("removing merge information " + mergeInfo);
-    manager.clearMergeState(tableId);
-    Utils.unreserveTable(manager, tableId, tid, true);
-    Utils.unreserveNamespace(manager, namespaceId, tid, false);
+    KeyExtent range = data.getReserveExtent();
+    var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+    log.debug("{} unreserving tablet in range {}", FateTxId.formatTid(tid), 
range);
+
+    try (var tablets = 
manager.getContext().getAmple().readTablets().forTable(data.tableId)
+        .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, 
LOCATION, OPID).build();
+        var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets();) {
+      int opsDeleted = 0;
+      int count = 0;
+
+      for (var tabletMeta : tablets) {
+        if (opid.equals(tabletMeta.getOperationId())) {
+          
tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireOperation(opid)
+              .deleteOperation().submit(tm -> 
!opid.equals(tm.getOperationId()));
+          opsDeleted++;
+        }
+        count++;
+      }
+
+      Preconditions.checkState(count > 0);
+
+      var results = tabletsMutator.process();
+      var deletesAccepted =
+          results.values().stream().filter(conditionalResult -> 
conditionalResult.getStatus()
+              == Ample.ConditionalResult.Status.ACCEPTED).count();
+
+      log.debug("{} deleted {}/{} opids out of {} tablets", 
FateTxId.formatTid(tid),
+          deletesAccepted, opsDeleted, count);
+
+      manager.getEventCoordinator().event(range, "Merge or deleterows 
completed %s",
+          FateTxId.formatTid(tid));
+
+      DeleteRows.verifyAccepted(results, FateTxId.formatTid(tid));
+      Preconditions.checkState(deletesAccepted == opsDeleted);
+    }
+
+    Utils.unreserveTable(manager, data.tableId, tid, true);
+    Utils.unreserveNamespace(manager, data.namespaceId, tid, false);
     return null;
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
new file mode 100644
index 0000000000..0da5159b65
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import java.io.Serializable;
+
+import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+public class MergeInfo implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  public enum Operation {
+    MERGE, DELETE,
+  }
+
+  final TableId tableId;
+  final NamespaceId namespaceId;
+  private final byte[] startRow;
+  private final byte[] endRow;
+  private final boolean mergeRangeSet;
+  private final byte[] mergeStartRow;
+  private final byte[] mergeEndRow;
+  final Operation op;
+
+  public MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, 
byte[] endRow,
+      Operation op) {
+    this(tableId, namespaceId, startRow, endRow, null, op);
+  }
+
+  private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, 
byte[] endRow,
+      KeyExtent mergeRange, Operation op) {
+    this.tableId = tableId;
+    this.namespaceId = namespaceId;
+    this.startRow = startRow;
+    this.endRow = endRow;
+    this.op = op;
+    this.mergeRangeSet = mergeRange != null;
+    if (mergeRange != null) {
+      mergeStartRow =
+          mergeRange.prevEndRow() == null ? null : 
TextUtil.getBytes(mergeRange.prevEndRow());
+      mergeEndRow = mergeRange.endRow() == null ? null : 
TextUtil.getBytes(mergeRange.endRow());
+    } else {
+      mergeStartRow = null;
+      mergeEndRow = null;
+    }
+  }
+
+  public void validate() throws AcceptableThriftTableOperationException {
+    if (startRow != null && endRow != null) {
+      Text start = new Text(startRow);
+      Text end = new Text(endRow);
+
+      if (start.compareTo(end) >= 0) {
+        throw new AcceptableThriftTableOperationException(tableId.canonical(), 
null,
+            TableOperation.MERGE, TableOperationExceptionType.BAD_RANGE,
+            "start row must be less than end row");
+      }
+    }
+  }
+
+  public KeyExtent getOriginalExtent() {
+    return new KeyExtent(tableId, endRow == null ? null : new Text(endRow),
+        startRow == null ? null : new Text(startRow));
+  }
+
+  public KeyExtent getMergeExtent() {
+    if (mergeRangeSet) {
+      return new KeyExtent(tableId, mergeEndRow == null ? null : new 
Text(mergeEndRow),
+          mergeStartRow == null ? null : new Text(mergeStartRow));
+    } else {
+      return getOriginalExtent();
+    }
+  }
+
+  public KeyExtent getReserveExtent() {
+    switch (op) {
+      case MERGE:
+        return getOriginalExtent();
+      case DELETE: {
+        if (endRow == null) {
+          return getOriginalExtent();
+        } else {
+          // Extend the reserve range a bit because if the last tablet is 
completely contained in
+          // the delete range then it will be merged away. Merging the last 
tablet away will result
+          // in modifying the next tablet, so need to reserve it.
+          return new KeyExtent(tableId, new 
Key(endRow).followingKey(PartialKey.ROW).getRow(),
+              startRow == null ? null : new Text(startRow));
+        }
+      }
+      default:
+        throw new IllegalArgumentException("unknown op " + op);
+    }
+  }
+
+  public MergeInfo useMergeRange(KeyExtent mergeRange) {
+    Preconditions.checkArgument(op == Operation.DELETE);
+    Preconditions.checkArgument(getReserveExtent().contains(mergeRange));
+    return new MergeInfo(tableId, namespaceId, startRow, endRow, mergeRange, 
op);
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
index d6a7de9716..8d5dbc8d7c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
@@ -18,36 +18,43 @@
  */
 package org.apache.accumulo.manager.tableOps.merge;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
+import static 
org.apache.accumulo.manager.tableOps.merge.DeleteRows.verifyAccepted;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TabletHostingGoal;
-import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.server.gc.AllVolumesDirectory;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -61,222 +68,180 @@ public class MergeTablets extends ManagerRepo {
 
   private static final Logger log = 
LoggerFactory.getLogger(MergeTablets.class);
 
-  private final NamespaceId namespaceId;
-  private final TableId tableId;
+  private final MergeInfo data;
 
-  public MergeTablets(NamespaceId namespaceId, TableId tableId) {
-    this.namespaceId = namespaceId;
-    this.tableId = tableId;
+  public MergeTablets(MergeInfo data) {
+    this.data = data;
   }
 
   @Override
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
-    MergeInfo mergeInfo = manager.getMergeInfo(tableId);
-    Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING);
-    Preconditions.checkState(!mergeInfo.isDelete());
+    mergeMetadataRecords(manager, tid);
+    return new FinishTableRangeOp(data);
+  }
 
-    var extent = mergeInfo.getExtent();
-    long tabletCount;
+  private void mergeMetadataRecords(Manager manager, long tid) throws 
AccumuloException {
+    var fateStr = FateTxId.formatTid(tid);
+    KeyExtent range = data.getMergeExtent();
+    log.debug("{} Merging metadata for {}", fateStr, range);
 
-    try (var tabletMeta = 
manager.getContext().getAmple().readTablets().forTable(extent.tableId())
-        .overlapping(extent.prevEndRow(), 
extent.endRow()).fetch(TabletMetadata.ColumnType.PREV_ROW)
-        .checkConsistency().build()) {
-      tabletCount = tabletMeta.stream().count();
-    }
+    var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+    Set<TabletHostingGoal> goals = new HashSet<>();
+    MetadataTime maxLogicalTime = null;
+    List<ReferenceFile> dirs = new ArrayList<>();
+    Map<StoredTabletFile,DataFileValue> newFiles = new HashMap<>();
+    TabletMetadata firstTabletMeta = null;
+    TabletMetadata lastTabletMeta = null;
+
+    try (var tabletsMetadata = manager.getContext().getAmple().readTablets()
+        .forTable(range.tableId()).overlapping(range.prevEndRow(), 
range.endRow())
+        .fetch(OPID, LOCATION, HOSTING_GOAL, FILES, TIME, DIR, ECOMP, 
PREV_ROW).build()) {
+
+      int tabletsSeen = 0;
+
+      for (var tabletMeta : tabletsMetadata) {
+        Preconditions.checkState(lastTabletMeta == null,
+            "%s unexpectedly saw multiple last tablets %s %s", fateStr, 
tabletMeta.getExtent(),
+            range);
+        validateTablet(tabletMeta, fateStr, opid, data.tableId);
+
+        if (firstTabletMeta == null) {
+          firstTabletMeta = Objects.requireNonNull(tabletMeta);
+        }
 
-    if (tabletCount > 1) {
-      mergeMetadataRecords(manager, mergeInfo);
-    }
+        tabletsSeen++;
+
+        // want to gather the following for all tablets, including the last 
tablet
+        maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, 
tabletMeta.getTime());
+        goals.add(tabletMeta.getHostingGoal());
+
+        // determine if this is the last tablet in the merge range
+        boolean isLastTablet = (range.endRow() == null && 
tabletMeta.getExtent().endRow() == null)
+            || (range.endRow() != null && 
tabletMeta.getExtent().contains(range.endRow()));
+        if (isLastTablet) {
+          lastTabletMeta = tabletMeta;
+        } else {
+          // files for the last tablet need to specially handled, so only add 
other tablets files
+          // here
+          tabletMeta.getFilesMap().forEach((file, dfv) -> {
+            newFiles.put(fenceFile(tabletMeta.getExtent(), file), dfv);
+          });
+
+          // queue all tablets dirs except the last tablets to be added as GC 
candidates
+          dirs.add(new AllVolumesDirectory(range.tableId(), 
tabletMeta.getDirName()));
+        }
+      }
 
-    return new FinishTableRangeOp(namespaceId, tableId);
-  }
+      if (tabletsSeen == 1) {
+        // The merge range overlaps a single tablet, so there is nothing to 
do. This could be
+        // because there was only a single tablet before merge started or this 
operation completed
+        // but the process died and now its running a 2nd time.
+        return;
+      }
 
-  private void mergeMetadataRecords(Manager manager, MergeInfo info) throws 
AccumuloException {
-    KeyExtent range = info.getExtent();
-    log.debug("Merging metadata for {}", range);
-    KeyExtent stop = DeleteRows.getHighTablet(manager, range);
-    log.debug("Highest tablet is {}", stop);
-    Value firstPrevRowValue = null;
-    Text stopRow = stop.toMetaRow();
-    Text start = range.prevEndRow();
-    if (start == null) {
-      start = new Text();
-    }
-    Range scanRange = new 
Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start),
-        false, stopRow, false);
-    String targetSystemTable = MetadataTable.NAME;
-    if (range.isMeta()) {
-      targetSystemTable = RootTable.NAME;
+      Preconditions.checkState(lastTabletMeta != null, "%s no tablets seen in 
range %s", opid,
+          lastTabletMeta);
     }
-    Set<TabletHostingGoal> goals = new HashSet<>();
-
-    AccumuloClient client = manager.getContext();
-
-    KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow());
-    KeyExtent previousKeyExtent = null;
-    KeyExtent lastExtent = null;
-
-    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
-      long fileCount = 0;
-      // Make file entries in highest tablet
-      Scanner scanner = client.createScanner(targetSystemTable, 
Authorizations.EMPTY);
-      // Update to set the range to include the highest tablet
-      scanner.setRange(new 
Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start),
-          false, stopRow, true));
-      
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
-      Mutation m = new Mutation(stopRow);
-      MetadataTime maxLogicalTime = null;
-      for (Map.Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        Value value = entry.getValue();
-
-        final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow());
-
-        // Keep track of the last Key Extent seen so we can use it to fence
-        // of RFiles when merging the metadata
-        if (lastExtent != null && !keyExtent.equals(lastExtent)) {
-          previousKeyExtent = lastExtent;
-        }
 
-        // Special case to handle the highest/stop tablet, which is where 
files are
-        // merged to. The existing merge code won't delete files from this 
tablet
-        // so we need to handle the deletes in this tablet when fencing files.
-        // We may be able to make this simpler in the future.
-        if (keyExtent.equals(stopExtent)) {
-          if (previousKeyExtent != null && key.getColumnFamily()
-              
.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
-
-            // Fence off existing files by the end row of the previous tablet 
and current tablet
-            final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
-            // The end row should be inclusive for the current tablet and the 
previous end row
-            // should be exclusive for the start row
-            Range fenced = new Range(previousKeyExtent.endRow(), false, 
keyExtent.endRow(), true);
-
-            // Clip range if exists
-            fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
-
-            final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
-            // If the existing metadata does not match then we need to delete 
the old
-            // and replace with a new range
-            if (!existing.equals(newFile)) {
-              
m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME,
-                  existing.getMetadataText());
-              m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
-            }
-
-            fileCount++;
+    log.info("{} merge low tablet {}", fateStr, firstTabletMeta.getExtent());
+    log.info("{} merge high tablet {}", fateStr, lastTabletMeta.getExtent());
+
+    // Check if the last tablet was already updated, this could happen if a 
process died and this
+    // code is running a 2nd time. If running a 2nd time it possible the last 
tablet was updated and
+    // only a subset of the other tablets were deleted. If the last tablet was 
never updated, then
+    // its prev row should be the greatest.
+    Comparator<Text> prevRowComparator = 
Comparator.nullsFirst(Text::compareTo);
+    if (prevRowComparator.compare(firstTabletMeta.getPrevEndRow(), 
lastTabletMeta.getPrevEndRow())
+        < 0) {
+      // update the last tablet
+      try (var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
+        var lastExtent = lastTabletMeta.getExtent();
+        var tabletMutator =
+            
tabletsMutator.mutateTablet(lastExtent).requireOperation(opid).requireAbsentLocation();
+
+        // fence the files in the last tablet if needed
+        lastTabletMeta.getFilesMap().forEach((file, dfv) -> {
+          var fencedFile = fenceFile(lastExtent, file);
+          // If the existing metadata does not match then we need to delete 
the old
+          // and replace with a new range
+          if (!fencedFile.equals(file)) {
+            tabletMutator.deleteFile(file);
+            tabletMutator.putFile(fencedFile, dfv);
           }
-          // For the highest tablet we only care about the DataFileColumnFamily
-          continue;
-        }
+        });
 
-        // Handle metadata updates for all other tablets except the highest 
tablet
-        // Ranges are created for the files and then added to the highest 
tablet in
-        // the merge range. Deletes are handled later for the old files when 
the tablets
-        // are removed.
-        if 
(key.getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME))
 {
-          final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
-
-          // Fence off files by the previous tablet and current tablet that is 
being merged
-          // The end row should be inclusive for the current tablet and the 
previous end row should
-          // be exclusive for the start row.
-          Range fenced = new Range(previousKeyExtent != null ? 
previousKeyExtent.endRow() : null,
-              false, keyExtent.endRow(), true);
-
-          // Clip range with the tablet range if the range already exists
-          fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
-
-          // Move the file and range to the last tablet
-          StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), 
fenced);
-          m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
-
-          fileCount++;
-        } else if 
(MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
-            && firstPrevRowValue == null) {
-          log.debug("prevRow entry for lowest tablet is {}", value);
-          firstPrevRowValue = new Value(value);
-        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          maxLogicalTime =
-              TabletTime.maxMetadataTime(maxLogicalTime, 
MetadataTime.parse(value.toString()));
-        } else if 
(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN
-            .hasColumns(key)) {
-          var allVolumesDir = new AllVolumesDirectory(range.tableId(), 
value.toString());
-          
bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir));
-        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) 
{
-          TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(value);
-          goals.add(thisGoal);
-        }
+        newFiles.forEach(tabletMutator::putFile);
+        tabletMutator.putTime(maxLogicalTime);
+        lastTabletMeta.getExternalCompactions().keySet()
+            .forEach(tabletMutator::deleteExternalCompaction);
+        tabletMutator.putHostingGoal(DeleteRows.getMergeHostingGoal(range, 
goals));
+        tabletMutator.putPrevEndRow(firstTabletMeta.getPrevEndRow());
 
-        lastExtent = keyExtent;
-      }
+        // if the tablet no longer exists (because changed prev end row, then 
the update was
+        // successful.
+        tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
 
-      // read the logical time from the last tablet in the merge range, it is 
not included in
-      // the loop above
-      scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY);
-      scanner.setRange(new Range(stopRow));
-      
MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      
MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner);
-      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME);
-      Set<String> extCompIds = new HashSet<>();
-      for (Map.Entry<Key,Value> entry : scanner) {
-        if (MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN
-            .hasColumns(entry.getKey())) {
-          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime,
-              MetadataTime.parse(entry.getValue().toString()));
-        } else if 
(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME
-            .equals(entry.getKey().getColumnFamily())) {
-          extCompIds.add(entry.getKey().getColumnQualifierData().toString());
-        } else if 
(MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN
-            .hasColumns(entry.getKey())) {
-          TabletHostingGoal thisGoal = 
TabletHostingGoalUtil.fromValue(entry.getValue());
-          goals.add(thisGoal);
-        }
+        verifyAccepted(tabletsMutator.process(), fateStr);
       }
+    }
 
-      if (maxLogicalTime != null) {
-        MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m,
-            new Value(maxLogicalTime.encode()));
-      }
+    // add gc candidates for the tablet dirs that being merged away, once 
these dirs are empty the
+    // Accumulo GC will delete the dir
+    manager.getContext().getAmple().putGcFileAndDirCandidates(range.tableId(), 
dirs);
 
-      // delete any entries for external compactions
-      extCompIds.forEach(ecid -> m
-          
.putDelete(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.STR_NAME,
 ecid));
+    // delete tablets
+    try (
+        var tabletsMetadata =
+            
manager.getContext().getAmple().readTablets().forTable(range.tableId())
+                .overlapping(range.prevEndRow(), 
range.endRow()).saveKeyValues().build();
+        var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
 
-      // Set the TabletHostingGoal for this tablet based on the goals of the 
other tablets in
-      // the merge range. Always takes priority over never.
-      TabletHostingGoal mergeHostingGoal = 
DeleteRows.getMergeHostingGoal(range, goals);
-      MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.put(m,
-          TabletHostingGoalUtil.toValue(mergeHostingGoal));
+      for (var tabletMeta : tabletsMetadata) {
+        validateTablet(tabletMeta, fateStr, opid, data.tableId);
 
-      if (!m.getUpdates().isEmpty()) {
-        bw.addMutation(m);
-      }
+        // do not delete the last tablet
+        if (Objects.equals(tabletMeta.getExtent().endRow(), 
lastTabletMeta.getExtent().endRow())) {
+          break;
+        }
 
-      bw.flush();
+        var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent())
+            .requireOperation(opid).requireAbsentLocation();
 
-      log.debug("Moved {} files to {}", fileCount, stop);
+        tabletMeta.getKeyValues().keySet().forEach(key -> {
+          log.debug("{} deleting {}", fateStr, key);
+        });
 
-      if (firstPrevRowValue == null) {
-        log.debug("tablet already merged");
-        return;
+        tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet());
+        // if the tablet no longer exists, then it was successful
+        tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());
       }
 
-      stop = new KeyExtent(stop.tableId(), stop.endRow(),
-          
MetadataSchema.TabletsSection.TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
-      Mutation updatePrevRow =
-          
MetadataSchema.TabletsSection.TabletColumnFamily.createPrevRowMutation(stop);
-      log.debug("Setting the prevRow for last tablet: {}", stop);
-      bw.addMutation(updatePrevRow);
-      bw.flush();
+      verifyAccepted(tabletsMutator.process(), fateStr);
+    }
+  }
 
-      DeleteRows.deleteTablets(info, scanRange, bw, client);
+  static void validateTablet(TabletMetadata tabletMeta, String fateStr, 
TabletOperationId opid,
+      TableId expectedTableId) {
+    // its expected at this point that tablets have our operation id and no 
location, so lets
+    // check that
+    Preconditions.checkState(tabletMeta.getLocation() == null,
+        "%s merging tablet %s had location %s", fateStr, 
tabletMeta.getExtent(),
+        tabletMeta.getLocation());
+    Preconditions.checkState(opid.equals(tabletMeta.getOperationId()),
+        "%s merging tablet %s had unexpected opid %s", fateStr, 
tabletMeta.getExtent(),
+        tabletMeta.getOperationId());
+    Preconditions.checkState(expectedTableId.equals(tabletMeta.getTableId()),
+        "%s tablet %s has unexpected table id %s expected %s", fateStr, 
tabletMeta.getExtent(),
+        tabletMeta.getTableId(), expectedTableId);
+  }
 
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
+  /**
+   * Fence a file to a tablets data range.
+   */
+  private static StoredTabletFile fenceFile(KeyExtent extent, StoredTabletFile 
file) {
+    Range fenced = extent.toDataRange();
+    // Clip range if exists
+    fenced = file.hasRange() ? file.getRange().clip(fenced) : fenced;
+    return StoredTabletFile.of(file.getPath(), fenced);
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
new file mode 100644
index 0000000000..04a5511b68
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class ReserveTablets extends ManagerRepo {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ReserveTablets.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private final MergeInfo data;
+
+  public ReserveTablets(MergeInfo data) {
+    this.data = data;
+  }
+
+  @Override
+  public long isReady(long tid, Manager env) throws Exception {
+    var range = data.getReserveExtent();
+    log.debug("{} reserving tablets in range {}", FateTxId.formatTid(tid), 
range);
+    var opid = TabletOperationId.from(TabletOperationType.MERGING, tid);
+
+    try (
+        var tablets = 
env.getContext().getAmple().readTablets().forTable(data.tableId)
+            .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, 
LOCATION, OPID)
+            .checkConsistency().build();
+        var tabletsMutator = 
env.getContext().getAmple().conditionallyMutateTablets();) {
+
+      int count = 0;
+      int otherOps = 0;
+      int opsSet = 0;
+      int locations = 0;
+
+      for (var tabletMeta : tablets) {
+
+        if (tabletMeta.getOperationId() == null) {
+          
tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation()
+              .putOperation(opid).submit(tm -> 
opid.equals(tm.getOperationId()));
+          opsSet++;
+        } else if (!tabletMeta.getOperationId().equals(opid)) {
+          otherOps++;
+        }
+
+        if (tabletMeta.getLocation() != null) {
+          locations++;
+        }
+
+        count++;
+      }
+
+      var opsAccepted = tabletsMutator.process().values().stream()
+          .filter(conditionalResult -> conditionalResult.getStatus() == 
Status.ACCEPTED).count();
+
+      log.debug(
+          "{} reserve tablets op:{} count:{} other opids:{} opids set:{} 
locations:{} accepted:{}",
+          FateTxId.formatTid(tid), data.op, count, otherOps, opsSet, 
locations, opsAccepted);
+
+      // while there are table lock a tablet can be concurrently deleted, so 
should always see
+      // tablets
+      Preconditions.checkState(count > 0);
+
+      if (locations > 0 && opsAccepted > 0) {
+        // operation ids were set and tablets have locations, so lets send a 
signal to get them
+        // unassigned
+        env.getEventCoordinator().event(range, "Tablets %d were reserved for 
merge %s", opsAccepted,
+            FateTxId.formatTid(tid));
+      }
+
+      if (locations > 0 || otherOps > 0) {
+        // need to wait on these tablets
+        return Math.max(1000, count);
+      }
+
+      if (opsSet != opsAccepted) {
+        // not all operation ids were set
+        return Math.max(1000, count);
+      }
+
+      // operations ids were set on all tablets and no tablets have locations, 
so ready
+      return 0;
+    }
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager environment) throws Exception {
+    if (data.op == MergeInfo.Operation.MERGE) {
+      return new MergeTablets(data);
+    } else {
+      return new DeleteRows(data);
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
index fa1f587e3a..719f4bf1cf 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
@@ -18,21 +18,15 @@
  */
 package org.apache.accumulo.manager.tableOps.merge;
 
-import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeInfo.Operation;
-import org.apache.accumulo.server.manager.state.MergeState;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,70 +36,39 @@ public class TableRangeOp extends ManagerRepo {
 
   private static final long serialVersionUID = 1L;
 
-  private final TableId tableId;
-  private final NamespaceId namespaceId;
-  private byte[] startRow;
-  private byte[] endRow;
-  private Operation op;
+  private final MergeInfo data;
 
   @Override
   public long isReady(long tid, Manager env) throws Exception {
-    return Utils.reserveNamespace(env, namespaceId, tid, false, true, 
TableOperation.MERGE)
-        + Utils.reserveTable(env, tableId, tid, true, true, 
TableOperation.MERGE);
+    return Utils.reserveNamespace(env, data.namespaceId, tid, false, true, 
TableOperation.MERGE)
+        + Utils.reserveTable(env, data.tableId, tid, true, true, 
TableOperation.MERGE);
   }
 
   public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId 
tableId,
       Text startRow, Text endRow) {
-    this.tableId = tableId;
-    this.namespaceId = namespaceId;
-    this.startRow = TextUtil.getBytes(startRow);
-    this.endRow = TextUtil.getBytes(endRow);
-    this.op = op;
+    byte[] start = startRow.getLength() == 0 ? null : 
TextUtil.getBytes(startRow);
+    byte[] end = endRow.getLength() == 0 ? null : TextUtil.getBytes(endRow);
+    this.data = new MergeInfo(tableId, namespaceId, start, end, op);
   }
 
   @Override
   public Repo<Manager> call(long tid, Manager env) throws Exception {
 
-    if (RootTable.ID.equals(tableId) && Operation.MERGE.equals(op)) {
+    if (RootTable.ID.equals(data.tableId) && 
MergeInfo.Operation.MERGE.equals(data.op)) {
       log.warn("Attempt to merge tablets for {} does nothing. It is not 
splittable.",
           RootTable.NAME);
     }
 
-    Text start = startRow.length == 0 ? null : new Text(startRow);
-    Text end = endRow.length == 0 ? null : new Text(endRow);
+    env.mustBeOnline(data.tableId);
 
-    if (start != null && end != null) {
-      if (start.compareTo(end) >= 0) {
-        throw new AcceptableThriftTableOperationException(tableId.canonical(), 
null,
-            TableOperation.MERGE, TableOperationExceptionType.BAD_RANGE,
-            "start row must be less than end row");
-      }
-    }
-
-    env.mustBeOnline(tableId);
+    data.validate();
 
-    MergeInfo info = env.getMergeInfo(tableId);
-
-    // ELASTICITY_TODO can remove MergeState and MergeInfo once opid is set, 
these only exists now
-    // to get tablets unassigned. Once an opid is set on a tablet it will be 
unassigned. See #3763
-    if (info.getState() == MergeState.NONE) {
-      KeyExtent range = new KeyExtent(tableId, end, start);
-      env.setMergeState(new MergeInfo(range, op), 
MergeState.WAITING_FOR_OFFLINE);
-    }
-
-    return new WaitForOffline(namespaceId, tableId);
+    return new ReserveTablets(data);
   }
 
   @Override
   public void undo(long tid, Manager env) throws Exception {
-    // Not sure this is a good thing to do. The Manager state engine should be 
the one to remove it.
-    MergeInfo mergeInfo = env.getMergeInfo(tableId);
-    if (mergeInfo.getState() != MergeState.NONE) {
-      log.info("removing merge information {}", mergeInfo);
-    }
-    env.clearMergeState(tableId);
-    Utils.unreserveNamespace(env, namespaceId, tid, false);
-    Utils.unreserveTable(env, tableId, tid, true);
+    Utils.unreserveNamespace(env, data.namespaceId, tid, false);
+    Utils.unreserveTable(env, data.tableId, tid, true);
   }
-
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
deleted file mode 100644
index c55ff213c5..0000000000
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager.tableOps.merge;
-
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.FateTxId;
-import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.manager.Manager;
-import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.apache.accumulo.server.manager.state.MergeInfo;
-import org.apache.accumulo.server.manager.state.MergeState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WaitForOffline extends ManagerRepo {
-
-  private static final Logger log = 
LoggerFactory.getLogger(WaitForOffline.class);
-
-  private static final long serialVersionUID = 1L;
-
-  private final NamespaceId namespaceId;
-  private final TableId tableId;
-
-  public WaitForOffline(NamespaceId namespaceId, TableId tableId) {
-    this.namespaceId = namespaceId;
-    this.tableId = tableId;
-  }
-
-  @Override
-  public long isReady(long tid, Manager env) throws Exception {
-    MergeInfo mergeInfo = env.getMergeInfo(tableId);
-    var extent = mergeInfo.getExtent();
-
-    long tabletsWithLocations;
-
-    try (var tabletMeta = 
env.getContext().getAmple().readTablets().forTable(extent.tableId())
-        .overlapping(extent.prevEndRow(), extent.endRow())
-        .fetch(TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.LOCATION)
-        .checkConsistency().build()) {
-      tabletsWithLocations = tabletMeta.stream().filter(tm -> tm.getLocation() 
!= null).count();
-    }
-
-    log.info("{} waiting for {} tablets with locations", 
FateTxId.formatTid(tid),
-        tabletsWithLocations);
-
-    if (tabletsWithLocations > 0) {
-      return 1000;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public Repo<Manager> call(long tid, Manager env) throws Exception {
-    MergeInfo mergeInfo = env.getMergeInfo(tableId);
-    env.setMergeState(mergeInfo, MergeState.MERGING);
-    if (mergeInfo.isDelete()) {
-      return new DeleteRows(namespaceId, tableId);
-    } else {
-      return new MergeTablets(namespaceId, tableId);
-    }
-  }
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
index cec2e224c6..fb7215216b 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
@@ -177,7 +177,6 @@ public class ComprehensiveIT extends SharedMiniClusterBase {
       client.tableOperations().merge(table, null, null);
       assertEquals(Set.of(), new 
TreeSet<>(client.tableOperations().listSplits(table)));
       verifyData(client, table, AUTHORIZATIONS, expectedData);
-
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java
index 88768a7ac6..fc8dc9fb3f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java
@@ -220,6 +220,7 @@ public class DeleteRowsIT extends AccumuloClusterHarness {
 
     Text startText = start == null ? null : new Text(start);
     Text endText = end == null ? null : new Text(end);
+
     c.tableOperations().deleteRows(table, startText, endText);
     Collection<Text> remainingSplits = c.tableOperations().listSplits(table);
     StringBuilder sb = new StringBuilder();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 36fa313fd7..9c3c2a37f8 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -73,7 +72,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.server.manager.state.CurrentState;
-import org.apache.accumulo.server.manager.state.MergeInfo;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
@@ -164,18 +162,6 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
       assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, state),
           "Only 1 of 2 tablets in table t1 should be returned");
 
-      // test the cases where there is ongoing merges
-      state = new State(client) {
-        @Override
-        public Collection<MergeInfo> merges() {
-          TableId tableIdToModify = 
TableId.of(client.tableOperations().tableIdMap().get(t3));
-          return Collections.singletonList(
-              new MergeInfo(new KeyExtent(tableIdToModify, null, null), 
MergeInfo.Operation.MERGE));
-        }
-      };
-      assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, state),
-          "Should have 2 tablets that need to be chopped or unassigned");
-
       // test the bad tablet location state case (inconsistent metadata)
       state = new State(client);
       addDuplicateLocation(client, metaCopy3, t3);
@@ -399,11 +385,6 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
       return new HashMap<>();
     }
 
-    @Override
-    public Collection<MergeInfo> merges() {
-      return Collections.emptySet();
-    }
-
     @Override
     public Set<KeyExtent> migrationsSnapshot() {
       return Collections.emptySet();

Reply via email to