This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new fb57b5356b Create shared ConditionalWriter in Ample (#5763)
fb57b5356b is described below

commit fb57b5356ba0016a9bb23cba14453929efd736d3
Author: Dom G. <[email protected]>
AuthorDate: Thu Jul 31 16:29:07 2025 -0400

    Create shared ConditionalWriter in Ample (#5763)
    
    * Create shared ConditionalWriter in Ample
    
    ---------
    
    Co-authored-by: Keith Turner <[email protected]>
---
 .../org/apache/accumulo/core/conf/Property.java    |   5 +-
 .../org/apache/accumulo/server/ServerContext.java  |  52 +++++
 .../metadata/ConditionalTabletsMutatorImpl.java    |  32 ++-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |   7 +-
 .../ConditionalTabletsMutatorImplTest.java         |  65 +++---
 .../test/functional/AmpleConditionalWriterIT.java  | 221 ++++++++++-----------
 6 files changed, 232 insertions(+), 150 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f6cc6b12e8..35a98bafa2 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1304,7 +1304,10 @@ public enum Property {
       "4.0.0"),
   COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
       "compaction.coordinator.compactor.dead.check.interval", "5m", 
PropertyType.TIMEDURATION,
-      "The interval at which to check for dead compactors.", "2.1.0");
+      "The interval at which to check for dead compactors.", "2.1.0"),
+  
GENERAL_AMPLE_CONDITIONAL_WRITER_THREADS_MAX("general.ample.conditional.writer.threads.max",
 "8",
+      PropertyType.COUNT,
+      "The maximum number of threads for the shared ConditionalWriter used by 
Ample.", "4.0.0");
 
   private final String key;
   private final String defaultValue;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java 
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index b0b9c02212..3bf5450391 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
@@ -55,6 +58,7 @@ import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
@@ -107,9 +111,12 @@ public class ServerContext extends ClientContext {
   private final Supplier<LowMemoryDetector> lowMemoryDetector;
   private final AtomicReference<ServiceLock> serverLock = new 
AtomicReference<>();
   private final Supplier<MetricsInfo> metricsInfoSupplier;
+  private final Supplier<ConditionalWriter> sharedMetadataWriter;
+  private final Supplier<ConditionalWriter> sharedUserWriter;
 
   private final AtomicBoolean metricsInfoCreated = new AtomicBoolean(false);
   private final AtomicBoolean sharedSchedExecutorCreated = new 
AtomicBoolean(false);
+  private final AtomicBoolean sharedWritersCreated = new AtomicBoolean(false);
 
   public ServerContext(SiteConfiguration siteConfig) {
     this(ServerInfo.fromServerConfig(siteConfig));
@@ -138,6 +145,9 @@ public class ServerContext extends ClientContext {
             SecurityOperation.getAuthenticator(this), 
SecurityOperation.getPermHandler(this)));
     lowMemoryDetector = memoize(() -> new LowMemoryDetector());
     metricsInfoSupplier = memoize(() -> new MetricsInfoImpl(this));
+
+    sharedMetadataWriter = memoize(() -> 
createSharedConditionalWriter(DataLevel.METADATA));
+    sharedUserWriter = memoize(() -> 
createSharedConditionalWriter(DataLevel.USER));
   }
 
   /**
@@ -477,6 +487,29 @@ public class ServerContext extends ClientContext {
     return metricsInfoSupplier.get();
   }
 
+  private ConditionalWriter createSharedConditionalWriter(DataLevel level) {
+    try {
+      int maxThreads =
+          
getConfiguration().getCount(Property.GENERAL_AMPLE_CONDITIONAL_WRITER_THREADS_MAX);
+      var config = new 
ConditionalWriterConfig().setMaxWriteThreads(maxThreads);
+      String tableName = level.metaTable();
+      log.info("Creating shared ConditionalWriter for DataLevel {} with max 
threads: {}", level,
+          maxThreads);
+      sharedWritersCreated.set(true);
+      return createConditionalWriter(tableName, config);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException("Failed to create shared ConditionalWriter 
for level " + level, e);
+    }
+  }
+
+  public Supplier<ConditionalWriter> getSharedMetadataWriter() {
+    return sharedMetadataWriter;
+  }
+
+  public Supplier<ConditionalWriter> getSharedUserWriter() {
+    return sharedUserWriter;
+  }
+
   @Override
   public void close() {
     Preconditions.checkState(!isClosed(), "ServerContext.close was already 
called.");
@@ -486,6 +519,25 @@ public class ServerContext extends ClientContext {
     if (sharedSchedExecutorCreated.get()) {
       getScheduledExecutor().shutdownNow();
     }
+    if (sharedWritersCreated.get()) {
+      try {
+        ConditionalWriter writer = sharedMetadataWriter.get();
+        if (writer != null) {
+          writer.close();
+        }
+      } catch (Exception e) {
+        log.warn("Error closing shared metadata ConditionalWriter", e);
+      }
+
+      try {
+        ConditionalWriter writer = sharedUserWriter.get();
+        if (writer != null) {
+          writer.close();
+        }
+      } catch (Exception e) {
+        log.warn("Error closing shared user ConditionalWriter", e);
+      }
+    }
     super.close();
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 777364e0c4..902a8904d6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -69,6 +69,8 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
   final Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new 
HashMap<>();
   private final Map<KeyExtent,Supplier<String>> operationDescriptions = new 
HashMap<>();
   private final Function<DataLevel,String> tableMapper;
+  private final Supplier<ConditionalWriter> sharedMetadataWriter;
+  private final Supplier<ConditionalWriter> sharedUserWriter;
 
   public ConditionalTabletsMutatorImpl(ServerContext context) {
     this(context, DataLevel::metaTable);
@@ -76,8 +78,16 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
 
   public ConditionalTabletsMutatorImpl(ServerContext context,
       Function<DataLevel,String> tableMapper) {
+    this(context, tableMapper, context.getSharedMetadataWriter(), 
context.getSharedUserWriter());
+  }
+
+  public ConditionalTabletsMutatorImpl(ServerContext context,
+      Function<DataLevel,String> tableMapper, Supplier<ConditionalWriter> 
sharedMetadataWriter,
+      Supplier<ConditionalWriter> sharedUserWriter) {
     this.context = context;
     this.tableMapper = Objects.requireNonNull(tableMapper);
+    this.sharedMetadataWriter = sharedMetadataWriter;
+    this.sharedUserWriter = sharedUserWriter;
   }
 
   @Override
@@ -227,7 +237,24 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
   public Map<KeyExtent,Ample.ConditionalResult> process() {
     Preconditions.checkState(active);
     if (dataLevel != null) {
-      try (ConditionalWriter conditionalWriter = 
createConditionalWriter(dataLevel)) {
+      ConditionalWriter conditionalWriter = null;
+      boolean shouldClose = false;
+
+      try {
+        if (dataLevel == Ample.DataLevel.ROOT) {
+          conditionalWriter = createConditionalWriter(dataLevel);
+          shouldClose = true;
+        } else {
+          if (dataLevel == Ample.DataLevel.METADATA) {
+            conditionalWriter = sharedMetadataWriter.get();
+          } else if (dataLevel == Ample.DataLevel.USER) {
+            conditionalWriter = sharedUserWriter.get();
+          } else {
+            throw new IllegalArgumentException("Unsupported DataLevel: " + 
dataLevel);
+          }
+          shouldClose = false; // don't close shared writers
+        }
+
         var results = writeMutations(conditionalWriter);
 
         var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
@@ -300,6 +327,9 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
       } catch (TableNotFoundException e) {
         throw new RuntimeException(e);
       } finally {
+        if (shouldClose && conditionalWriter != null) {
+          conditionalWriter.close();
+        }
         // render inoperable because reuse is not tested
         extents.clear();
         mutations.clear();
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 8d58aa2d96..e93a06b012 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.SystemTables;
 import org.apache.accumulo.core.metadata.ValidationUtil;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.AmpleImpl;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
@@ -92,14 +93,16 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
   @Override
   public ConditionalTabletsMutator conditionallyMutateTablets() {
-    return new ConditionalTabletsMutatorImpl(context, getTableMapper());
+    return new ConditionalTabletsMutatorImpl(context, getTableMapper(),
+        context.getSharedMetadataWriter(), context.getSharedUserWriter());
   }
 
   @Override
   public AsyncConditionalTabletsMutator
       conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
     return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
-        () -> new ConditionalTabletsMutatorImpl(context, getTableMapper()));
+        () -> new ConditionalTabletsMutatorImpl(context, getTableMapper(),
+            context.getSharedMetadataWriter(), context.getSharedUserWriter()));
   }
 
   private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
index 8eae5a4b2f..a94daaa45c 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
@@ -24,68 +24,44 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
 import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.Iterators;
+
 public class ConditionalTabletsMutatorImplTest {
 
   // a conditional tablet mutator that always returns a supplied status
   static class TestConditionalTabletsMutator extends 
ConditionalTabletsMutatorImpl {
 
     private final Map<KeyExtent,TabletMetadata> failedExtents;
-    private final List<Function<Text,ConditionalWriter.Status>> statuses;
-
-    private int attempt = 0;
+    private final ConditionalWriter testWriter;
 
     public TestConditionalTabletsMutator(ServerContext context,
-        List<Function<Text,ConditionalWriter.Status>> statuses,
-        Map<KeyExtent,TabletMetadata> failedExtents) {
-      super(context);
-      this.statuses = statuses;
+        Map<KeyExtent,TabletMetadata> failedExtents, ConditionalWriter 
testWriter) {
+      super(context, DataLevel::metaTable, () -> testWriter, () -> testWriter);
       this.failedExtents = failedExtents;
+      this.testWriter = testWriter;
     }
 
     protected Map<KeyExtent,TabletMetadata> readTablets(List<KeyExtent> 
extents) {
       return failedExtents;
     }
 
-    protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel)
-        throws TableNotFoundException {
-      return new ConditionalWriter() {
-        @Override
-        public Iterator<Result> write(Iterator<ConditionalMutation> mutations) 
{
-          Iterable<ConditionalMutation> iterable = () -> mutations;
-          var localAttempt = attempt++;
-          return StreamSupport.stream(iterable.spliterator(), false)
-              .map(cm -> new Result(statuses.get(localAttempt).apply(new 
Text(cm.getRow())), cm,
-                  "server"))
-              .iterator();
-        }
-
-        @Override
-        public Result write(ConditionalMutation mutation) {
-          return write(List.of(mutation).iterator()).next();
-        }
-
-        @Override
-        public void close() {
-
-        }
-      };
+    protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel) {
+      return testWriter;
     }
   }
 
@@ -95,6 +71,7 @@ public class ConditionalTabletsMutatorImplTest {
     ServerContext context = EasyMock.createMock(ServerContext.class);
     ServiceLock lock = EasyMock.createMock(ServiceLock.class);
     LockID lid = EasyMock.createMock(LockID.class);
+
     EasyMock.expect(lock.getLockID()).andReturn(lid).anyTimes();
     EasyMock.expect(lid.serialize()).andReturn("/1234").anyTimes();
     EasyMock.expect(context.getServiceLock()).andReturn(lock).anyTimes();
@@ -138,8 +115,26 @@ public class ConditionalTabletsMutatorImplTest {
     var statuses2 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.REJECTED, 
ke2.toMetaRow(),
         ConditionalWriter.Status.REJECTED);
 
-    try (var mutator = new TestConditionalTabletsMutator(context,
-        List.of(statuses1::get, statuses2::get), failedExtents)) {
+    ConditionalWriter testWriter = new ConditionalWriter() {
+      private int attemptCount = 0;
+
+      @Override
+      public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+        var statuses = (attemptCount++ == 0) ? statuses1 : statuses2;
+        return Iterators.transform(mutations,
+            cm -> new Result(statuses.get(new Text(cm.getRow())), cm, 
"server"));
+      }
+
+      @Override
+      public Result write(ConditionalMutation mutation) {
+        return write(List.of(mutation).iterator()).next();
+      }
+
+      @Override
+      public void close() {}
+    };
+
+    try (var mutator = new TestConditionalTabletsMutator(context, 
failedExtents, testWriter)) {
 
       mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
           .submit(tmeta -> tmeta.getDirName().equals("dir1"));
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ade40a2cf8..c051fea6f5 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -107,7 +107,6 @@ import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
-import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -144,7 +143,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       String tableName = getUniqueNames(1)[0];
 
-      SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new 
Text("f"), new Text("j")));
+      SortedSet<Text> splits = new TreeSet<>(Set.of(new Text("c"), new 
Text("f"), new Text("j")));
       c.tableOperations().create(tableName,
           new NewTableConfiguration().withSplits(splits).createOffline());
 
@@ -170,7 +169,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     assertNull(context.getAmple().readTablet(e1).getLocation());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           .putLocation(Location.future(ts1)).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -178,7 +177,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
     // test require absent with a future location set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           .putLocation(Location.future(ts2)).submit(tm -> false,
               () -> "Testing that requireAbsentLocation() fails when a future 
location is set");
@@ -192,7 +191,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       assertEquals(List.of(), actual);
     }
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
           
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
           .submit(tm -> false);
@@ -201,7 +200,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
     // test require absent with a current location set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           .putLocation(Location.future(ts2)).submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus(),
@@ -216,7 +215,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       assertEquals(List.of(e1), actual);
     }
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
           
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
           .submit(tm -> false);
@@ -224,7 +223,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2))
           
.putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2))
           .submit(tm -> false);
@@ -232,7 +231,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
           .deleteLocation(Location.current(ts1)).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -241,7 +240,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Set two current locations, this puts the tablet in a bad state as its 
only expected that
     // single location should be set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().putLocation(Location.current(ts1))
           .putLocation(Location.current(ts2)).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -252,7 +251,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // Try to update the tablet requiring one of the locations that is set on 
the tablet. Even
     // though the required location exists, the presence of the other location 
in the tablet
     // metadata should cause the update to fail.
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
           
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
           .submit(tm -> false);
@@ -265,7 +264,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertThrows(IllegalStateException.class, () -> 
context.getAmple().readTablet(e1));
 
     // Requiring an absent location should fail when two locations are set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
           .submit(tm -> false);
@@ -275,7 +274,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertThrows(IllegalStateException.class, () -> 
context.getAmple().readTablet(e1));
 
     // Change the tablet to have a future and current location set.
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().deleteLocation(Location.current(ts1))
           .putLocation(Location.future(ts1)).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -288,7 +287,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // required location does not exist.
     for (var loc : List.of(Location.current(ts1), Location.current(ts2), 
Location.future(ts1),
         Location.future(ts2))) {
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(loc)
             
.deleteLocation(Location.future(ts1)).deleteLocation(Location.current(ts2))
             .submit(tm -> false);
@@ -298,7 +297,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
 
     // Requiring an absent location should fail when a future and current 
location are set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           
.deleteLocation(Location.current(ts1)).deleteLocation(Location.current(ts2))
           .submit(tm -> false);
@@ -308,7 +307,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertThrows(IllegalStateException.class, () -> 
context.getAmple().readTablet(e1));
 
     // Delete one of the locations w/o any location requirements, this should 
succeed.
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().deleteLocation(Location.future(ts1))
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -340,7 +339,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     System.out.println(context.getAmple().readTablet(e1).getLocation());
 
     // simulate a compaction where the tablet location is not set
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       var tm1 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
           .build();
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES)
@@ -352,7 +351,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var tm2 = 
TabletMetadata.builder(e1).putLocation(Location.current(ts1)).build();
     // simulate minor compacts where the tablet location is not set
     for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, 
LOCATION)
             .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -363,7 +362,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // set the location
     var tm3 = TabletMetadata.builder(e1).build(LOCATION);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, LOCATION)
           .putLocation(Location.current(ts1)).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -372,7 +371,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var tm4 = 
TabletMetadata.builder(e1).putLocation(Location.current(ts2)).build();
     // simulate minor compacts where the tablet location is wrong
     for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm4, 
LOCATION)
             .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -383,7 +382,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // simulate minor compacts where the tablet location is set
     for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, 
LOCATION)
             .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -397,7 +396,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
         TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).build(),
         TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
             .putFile(stf4, dfv).build())) {
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta, 
FILES)
             .putFile(stf4, new DataFileValue(0, 
0)).deleteFile(stf1).deleteFile(stf2)
             .deleteFile(stf3).submit(tm -> false);
@@ -409,7 +408,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // simulate a compaction
     var tm5 =
         TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv).build();
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
           .putFile(stf4, new DataFileValue(0, 
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
           .submit(tm -> false);
@@ -422,7 +421,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var compactedDv = new DataFileValue(0, 0);
     tm5 = TabletMetadata.builder(e1).putFile(stf1, compactedDv).putFile(stf2, 
compactedDv)
         .putFile(stf3, compactedDv).build();
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
           .putFile(stf4, new DataFileValue(0, 
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
           .submit(tm -> false);
@@ -436,7 +435,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var tm6 = TabletMetadata.builder(e1).build(LOADED);
     FateInstanceType type = FateInstanceType.fromTableId(tid);
     FateId fateId = FateId.from(type, UUID.randomUUID());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
           .putFile(stf5, new DataFileValue(0, 
0)).putBulkFile(stf5.getTabletFile(), fateId)
           .submit(tm -> false);
@@ -449,7 +448,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
         .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf"));
     var tm7 =
         TabletMetadata.builder(e1).putFile(stf4, compactedDv).putFile(stf5, 
compactedDv).build();
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES)
           .putFile(stf6, new DataFileValue(0, 
0)).deleteFile(stf4).deleteFile(stf5)
           .submit(tm -> false);
@@ -458,7 +457,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles());
 
     // simulate trying to re bulk import file after a compaction
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
           .putFile(stf5, new DataFileValue(0, 
0)).putBulkFile(stf5.getTabletFile(), fateId)
           .submit(tm -> false);
@@ -480,7 +479,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // tablet should not have any logs to start with so requireSame with the 
empty logs should pass
     assertTrue(context.getAmple().readTablet(e1).getLogs().isEmpty());
     Set<LogEntry> expectedLogs = new HashSet<>();
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet, 
LOGS)
           .putWal(originalLogEntry).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -493,7 +492,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     String walFilePath2 =
         java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
     LogEntry newLogEntry = LogEntry.fromPath(walFilePath2);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm -> 
false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
     }
@@ -522,7 +521,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       subset.forEach(builder::putWal);
       TabletMetadata tmSubset = builder.build(LOGS);
 
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset, 
LOGS)
             .deleteWal(originalLogEntry).submit(t -> false);
         assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -537,7 +536,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // this example)
     TabletMetadata tm2 =
         
TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS)
           .deleteWal(originalLogEntry).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus(),
@@ -549,7 +548,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // test the requireAbsentLogs() function when logs are not present in the 
tablet metadata
     assertTrue(context.getAmple().readTablet(e2).getLogs().isEmpty());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLogs().putWal(originalLogEntry)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -557,7 +556,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // test the requireAbsentLogs() function when logs are present in the 
tablet metadata
     assertFalse(context.getAmple().readTablet(e2).getLogs().isEmpty());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLogs().deleteWal(originalLogEntry)
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e2).getStatus());
@@ -583,7 +582,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // simulate a compaction where the tablet location is not set
     var tm1 = TabletMetadata.builder(e1).build(FILES, SELECTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, 
FILES).putFile(stf1, dfv)
           .putFile(stf2, dfv).putFile(stf3, dfv).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -594,7 +593,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     FateId fateId1 = FateId.from(type, UUID.randomUUID());
     FateId fateId2 = FateId.from(type, UUID.randomUUID());
     var time = SteadyTime.from(100_100, TimeUnit.NANOSECONDS);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, 
SELECTED)
           .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time))
           .submit(tm -> false);
@@ -605,7 +604,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tm2 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
         .build(SELECTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, 
SELECTED)
           .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time))
           .submit(tm -> false);
@@ -627,7 +626,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     for (var selectedFiles : expectedToFail) {
       var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
           .putSelectedFiles(selectedFiles).build();
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, FILES, 
SELECTED)
             .deleteSelectedFiles().submit(tm -> false);
         assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -639,7 +638,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
         .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time)).build();
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, 
SELECTED)
           .deleteSelectedFiles().submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -670,7 +669,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       final SelectedFiles selectedFiles =
           new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId, 
time);
 
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         // write the SelectedFiles to the keyextent
         
ctmi.mutateTablet(e1).requireAbsentOperation().putSelectedFiles(selectedFiles)
             .submit(tm -> false);
@@ -736,7 +735,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       // submit a mutation with the condition that the selected files match 
what was originally
       // written
       DataFileValue dfv = new DataFileValue(100, 100);
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, 
SELECTED).putFile(stf4, dfv)
             .submit(tm -> false);
         // with a SortedFilesIterator attached to the Condition for SELECTED, 
the metadata should be
@@ -775,7 +774,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var context = getCluster().getServerContext();
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           .putLocation(Location.future(ts1)).submit(tm -> false);
       ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
@@ -795,7 +794,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // in addition to testing multiple tablets also testing 
requireAbsentTablet() which is
     // currently not called elsewhere in this IT
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
           .putLocation(Location.future(ts2)).submit(tm -> false);
       
ctmi.mutateTablet(e2).requireAbsentTablet().putLocation(Location.future(ts1))
@@ -832,7 +831,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1);
     var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> 
false);
       
ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> 
false);
       
ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
@@ -849,7 +848,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId());
     assertNull(context.getAmple().readTablet(e3).getOperationId());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> 
false);
       
ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
       var results = ctmi.process();
@@ -861,7 +860,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(TabletOperationType.MERGING,
         context.getAmple().readTablet(e2).getOperationId().getType());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
       
ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> 
false);
       var results = ctmi.process();
@@ -897,7 +896,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test different compaction requirement scenarios for tablets w/o any 
compactions in the
     // metadata table
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMetaNoCompactions,
 ECOMP)
           .putExternalCompaction(ecid1, ecMeta).submit(tm -> false);
       ctmi.mutateTablet(e2).requireAbsentOperation().requireCompaction(ecid2)
@@ -922,7 +921,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test compaction requirements that do not match the compaction ids that 
exists in the
     // metadata table.
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireCompaction(ecid2)
           .deleteExternalCompaction(ecid1).submit(tm -> false);
       
ctmi.mutateTablet(e3).requireAbsentOperation().requireSame(tabletMetaCompactions,
 ECOMP)
@@ -938,7 +937,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test compaction requirements that match the compaction ids that exists 
in the metadata
     // table.
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMetaCompactions,
 ECOMP)
           .deleteExternalCompaction(ecid1).submit(tm -> false);
       ctmi.mutateTablet(e3).requireAbsentOperation().requireCompaction(ecid2)
@@ -965,7 +964,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
           .putCompacted(fateId2)
           .submit(tabletMetadata -> 
tabletMetadata.getCompacted().contains(fateId2));
@@ -981,7 +980,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(Set.of(fateId2), tabletMeta1.getCompacted());
     assertEquals(Set.of(), context.getAmple().readTablet(e2).getCompacted());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
           .putCompacted(fateId4).putCompacted(fateId5).submit(tabletMetadata 
-> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -992,7 +991,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // test require same with a superset
     tabletMeta1 = 
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4)
         .putCompacted(fateId5).putCompacted(fateId1).build(COMPACTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
           
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
           .submit(tabletMetadata -> false);
@@ -1004,7 +1003,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // test require same with a subset
     tabletMeta1 =
         
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4).build(COMPACTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
           
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
           .submit(tabletMetadata -> false);
@@ -1016,7 +1015,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // now use the exact set the tablet has
     tabletMeta1 = 
TabletMetadata.builder(e2).putCompacted(fateId2).putCompacted(fateId4)
         .putCompacted(fateId5).build(COMPACTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
           
.deleteCompacted(fateId2).deleteCompacted(fateId4).deleteCompacted(fateId5)
           .submit(tabletMetadata -> false);
@@ -1039,14 +1038,14 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     TabletOperationId opid =
         TabletOperationId.from(TabletOperationType.MERGING, FateId.from(type, 
UUID.randomUUID()));
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation()
           .putOperation(opid).submit(tm -> false);
       assertEquals(Status.REJECTED, 
ctmi.process().get(RootTable.EXTENT).getStatus());
     }
     
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
           
.requireLocation(Location.future(loc.getServerInstance())).putOperation(opid)
           .submit(tm -> false);
@@ -1054,7 +1053,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
           
.requireLocation(Location.current(loc.getServerInstance())).putOperation(opid)
           .submit(tm -> false);
@@ -1065,7 +1064,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     Wait.waitFor(() -> 
context.getAmple().readTablet(RootTable.EXTENT).getLocation() == null);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(RootTable.EXTENT).requireOperation(opid).requireAbsentLocation()
           
.putLocation(Location.future(loc.getServerInstance())).deleteOperation()
           .submit(tm -> false);
@@ -1083,7 +1082,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL),
         new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, 
TimeType.LOGICAL))) {
       var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build();
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
             .putTime(new MetadataTime(101, 
TimeType.LOGICAL)).submit(tabletMetadata -> false);
         assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1095,7 +1094,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     for (int i = 0; i < 10; i++) {
       var tabletMeta1 =
           TabletMetadata.builder(e1).putTime(new MetadataTime(i, 
TimeType.MILLIS)).build();
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
             .putTime(new MetadataTime(i + 1, 
TimeType.MILLIS)).submit(tabletMetadata -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1117,7 +1116,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     FateId fateId5 = FateId.from(type, UUID.randomUUID());
     var tabletMeta1 = 
TabletMetadata.builder(e1).build(USER_COMPACTION_REQUESTED);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireSame(tabletMeta1, 
USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId2)
           .submit(tabletMetadata -> 
tabletMetadata.getUserCompactionsRequested().contains(fateId2));
@@ -1134,7 +1133,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(Set.of(fateId2), tabletMeta1.getUserCompactionsRequested());
     assertEquals(Set.of(), 
context.getAmple().readTablet(e2).getUserCompactionsRequested());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireSame(tabletMeta1, 
USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId4)
           .putUserCompactionRequested(fateId5).submit(tabletMetadata -> false);
@@ -1147,7 +1146,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     tabletMeta1 = 
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
         
.putUserCompactionRequested(fateId4).putUserCompactionRequested(fateId5)
         .putUserCompactionRequested(fateId1).build(USER_COMPACTION_REQUESTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
           
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1160,7 +1159,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // test require same with a subset
     tabletMeta1 = 
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
         .putUserCompactionRequested(fateId4).build(USER_COMPACTION_REQUESTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
           
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1174,7 +1173,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     tabletMeta1 = 
TabletMetadata.builder(e2).putUserCompactionRequested(fateId2)
         
.putUserCompactionRequested(fateId4).putUserCompactionRequested(fateId5)
         .build(USER_COMPACTION_REQUESTED);
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED)
           
.deleteUserCompactionRequested(fateId2).deleteUserCompactionRequested(fateId4)
@@ -1244,7 +1243,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       for (KeyExtent ke : tabletsWithWalCompactFlush) {
         FateInstanceType type = FateInstanceType.fromTableId(ke.tableId());
         FateId fateId = FateId.from(type, UUID.randomUUID());
-        try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+        try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
           ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(fateId)
               .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal)
               .submit(tabletMetadata -> false);
@@ -1260,7 +1259,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       // on a subset of the tablets, put a location
       final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4);
       for (KeyExtent ke : tabletsWithLocation) {
-        try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+        try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
           
ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation()
               
.putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false);
           assertEquals(Status.ACCEPTED, ctmi.process().get(ke).getStatus());
@@ -1291,7 +1290,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
           "Initially, all tablets should be present");
 
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         // Set compacted on e2 but with no flush ID
         ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(fateId1)
             .submit(tabletMetadata -> false);
@@ -1301,7 +1300,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
           "Compacted but no flush ID should return no tablets");
 
       // Set incorrect flush ID on e2
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L)
             .submit(tabletMetadata -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -1310,7 +1309,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
           "Compacted with incorrect flush ID should return no tablets");
 
       // Set correct flush ID on e2
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e2).requireAbsentOperation()
             
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
@@ -1319,7 +1318,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
           "Compacted with correct flush ID should return e2");
 
       // Set compacted and correct flush ID on e3
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(fateId2)
             
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e3).getStatus());
@@ -1342,7 +1341,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       var walFilePath =
           java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
       var wal = LogEntry.fromPath(walFilePath);
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         
ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e2).getStatus());
       }
@@ -1353,7 +1352,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       // add wal to tablet e4
       walFilePath = java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
       wal = LogEntry.fromPath(walFilePath);
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         
ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e4).getStatus());
       }
@@ -1363,7 +1362,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
           "Only tablets with wals should be returned");
 
       // remove the wal from e4
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal)
             .submit(tabletMetadata -> false);
         assertEquals(Status.ACCEPTED, ctmi.process().get(e4).getStatus());
@@ -1375,7 +1374,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       var ts1 = new TServerInstance("localhost:9997", 5000L);
       var ts2 = new TServerInstance("localhost:9997", 6000L);
 
-      try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
         ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
             .putLocation(Location.future(ts1)).submit(tabletMetadata -> false);
         ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
@@ -1432,7 +1431,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build();
     assertTrue(tabletMeta1.getFlushId().isPresent());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
FLUSH_ID)
           .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1441,7 +1440,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID);
     assertFalse(tabletMeta2.getFlushId().isPresent());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, 
FLUSH_ID)
           .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1450,14 +1449,14 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build();
     assertTrue(tabletMeta3.getFlushId().isPresent());
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
           .putFlushId(44L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 44L);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
     }
     assertEquals(44L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
           .putFlushId(45L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 45L);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1546,7 +1545,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(TabletAvailability.ONDEMAND,
         context.getAmple().readTablet(e2).getTabletAvailability());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireTabletAvailability(TabletAvailability.HOSTED)
           .putTabletAvailability(TabletAvailability.UNHOSTED).submit(tm -> 
false);
@@ -1571,7 +1570,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     // require the UNSPLITTABLE column to be absent when it is absent
     var usm1 = UnSplittableMetadata.toUnSplittable(e1, 1000, 100000, 32, 
Set.of());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
UNSPLITTABLE)
           .setUnSplittable(usm1).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1580,7 +1579,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(usm1.toBase64(), tabletMeta2.getUnSplittable().toBase64());
 
     // require the UNSPLITTABLE column to be absent when it is not absent
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
UNSPLITTABLE)
           .deleteUnSplittable().submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1590,7 +1589,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var usm2 = UnSplittableMetadata.toUnSplittable(e1, 1001, 100001, 33, 
Set.of());
     var tabletMeta3 = 
TabletMetadata.builder(e1).setUnSplittable(usm2).build(UNSPLITTABLE);
     // require the UNSPLITTABLE column to be usm2 when it is actually usm1
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
UNSPLITTABLE)
           .deleteUnSplittable().submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1598,7 +1597,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(usm1.toBase64(), 
context.getAmple().readTablet(e1).getUnSplittable().toBase64());
 
     // require the UNSPLITTABLE column to be usm1 when it is actually usm1
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, 
UNSPLITTABLE)
           .deleteUnSplittable().submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1606,7 +1605,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertNull(context.getAmple().readTablet(e1).getUnSplittable());
 
     // require the UNSPLITTABLE column to be usm1 when it is actually absent
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, 
UNSPLITTABLE)
           .setUnSplittable(usm2).submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1618,7 +1617,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
   public void testErrors() {
     var context = getCluster().getServerContext();
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentTablet().putDirName("d1").submit(tm 
-> false);
       // making multiple updates for the same tablet is not supported
       assertThrows(IllegalStateException.class,
@@ -1630,7 +1629,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
               .requireSame(tabletMetadata, 
HOSTING_REQUESTED).putDirName("d2").submit(tm -> false));
     }
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       // the following end prev end row update should cause a constraint 
violation because a
       // tablet's prev end row must be less than its end row
       
ctmi.mutateTablet(e1).requireAbsentOperation().putPrevEndRow(e1.endRow()).submit(tm
 -> false);
@@ -1638,7 +1637,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
       assertThrows(IllegalStateException.class, () -> 
ctmi.process().get(e1).getStatus());
     }
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().putFlushNonce(1234).submit(tm -> 
false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
 
@@ -1663,7 +1662,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var dfv = new DataFileValue(100, 100);
 
     // Add 3 of the files, skip the 4th file
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, 
dfv).putFile(stf2, dfv)
           .putFile(stf3, dfv).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1672,7 +1671,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is accepted when given all files
     var time1 = MetadataTime.parse("L50");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf2, stf3))
           .putTime(time1).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1681,7 +1680,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is accepted when a subset of files is given
     var time2 = MetadataTime.parse("L60");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf3)).putTime(time2)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1690,7 +1689,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is rejected when a file is given that the tablet does not 
have
     var time3 = MetadataTime.parse("L70");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf4)).putTime(time3)
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1714,7 +1713,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var dfv = new DataFileValue(100, 100);
 
     // Add 3 of the files, skip the 4th file
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, 
dfv).putFile(stf2, dfv)
           .putFile(stf3, dfv).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1723,7 +1722,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is accepted when # files in tablet equals limit
     var time1 = MetadataTime.parse("L50");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time1)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1732,7 +1731,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is accepted when # files in tablet is less than limit
     var time2 = MetadataTime.parse("L60");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time2)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1741,7 +1740,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is rejected when # files in tablet is greater than limit
     var time3 = MetadataTime.parse("L70");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(2).putTime(time3)
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1750,14 +1749,14 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     assertEquals(time2, context.getAmple().readTablet(e1).getTime());
 
     // add fourth file
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf4, 
dfv).submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
     }
     assertEquals(Set.of(stf1, stf2, stf3, stf4), 
context.getAmple().readTablet(e1).getFiles());
 
     // Test mutation is rejected when # files in tablet is greater than limit
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time3)
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
@@ -1767,7 +1766,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     // Test mutation is accepted when # files in tablet equals limit
     var time4 = MetadataTime.parse("L80");
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time4)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
@@ -1790,7 +1789,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireAbsentLoaded(Set.of(stf1.getTabletFile(), 
stf2.getTabletFile()))
           .putBulkFile(stf1.getTabletFile(), 
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
@@ -1803,7 +1802,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
 
     FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
           .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, 
dfv).submit(tm -> false);
@@ -1814,7 +1813,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
         context.getAmple().readTablet(e1).getLoaded());
 
     // should fail because the loaded markers are present
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireAbsentLoaded(Set.of(stf1.getTabletFile(), 
stf2.getTabletFile()))
           .putBulkFile(stf1.getTabletFile(), 
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
@@ -1823,7 +1822,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
 
     // should fail because the loaded markers are present
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
           .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, 
dfv).putFlushId(99)
@@ -1867,14 +1866,14 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
         .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
     var dfv = new DataFileValue(100, 100);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, 
dfv).putFile(stf2, dfv)
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
     }
     assertEquals(Set.of(stf1, stf2), 
context.getAmple().readTablet(e1).getFiles());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireCheckSuccess(new TestCheck(Set.of(stf1, 
stf3))).putFile(stf3, dfv)
           .submit(tm -> false);
@@ -1882,7 +1881,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     assertEquals(Set.of(stf1, stf2), 
context.getAmple().readTablet(e1).getFiles());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireCheckSuccess(new TestCheck(Set.of(stf1, 
stf2))).putFile(stf3, dfv)
           .submit(tm -> false);
@@ -1890,7 +1889,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireCheckSuccess(new TestCheck(Set.of(stf1, 
stf3))).deleteFile(stf2)
           .submit(tm -> false);
@@ -1898,7 +1897,7 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     }
     assertEquals(Set.of(stf1, stf3), 
context.getAmple().readTablet(e1).getFiles());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       ctmi.mutateTablet(e1).requireAbsentOperation()
           .requireCheckSuccess(new TestCheck(Set.of(stf1, 
stf2))).deleteFile(stf1)
           .submit(tm -> false);
@@ -1913,27 +1912,27 @@ public class AmpleConditionalWriterIT extends 
SharedMiniClusterBase {
     var tsi = new TServerInstance("localhost:1234", 56L);
     var otherTsi = new TServerInstance("localhost:9876", 54L);
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(tsi).deleteMigration()
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
     }
     assertNull(context.getAmple().readTablet(e1).getMigration());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().putMigration(tsi).submit(tm -> 
false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
     }
     assertEquals(tsi, context.getAmple().readTablet(e1).getMigration());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(otherTsi).deleteMigration()
           .submit(tm -> false);
       assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
     }
     assertEquals(tsi, context.getAmple().readTablet(e1).getMigration());
 
-    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+    try (var ctmi = context.getAmple().conditionallyMutateTablets()) {
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireMigration(tsi).deleteMigration()
           .submit(tm -> false);
       assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());

Reply via email to