This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new a131e97052 updates delete table to use conditional muations (#3929) a131e97052 is described below commit a131e9705293ef7166cfce3fbdae9f40201430b8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Nov 6 13:18:27 2023 -0500 updates delete table to use conditional muations (#3929) Updates the delete table fate operation to use conditional mutations. Also introduces a new mechanism in Ample for processing conditional mutations that does not buffer everything in memory. This avoid buffering large amounts of tablet metadata in memory when deleting a large table. The IT that creates 1 million splits in a table and deletes it passes with these changes. --- .../accumulo/core/metadata/schema/Ample.java | 43 ++++++-- .../AsyncConditionalTabletsMutatorImpl.java | 98 ++++++++++++++++++ .../accumulo/server/metadata/ServerAmpleImpl.java | 7 ++ .../accumulo/manager/tableOps/delete/CleanUp.java | 31 +----- .../manager/tableOps/delete/DeleteTable.java | 2 +- .../manager/tableOps/delete/PreDeleteTable.java | 1 + .../manager/tableOps/delete/ReserveTablets.java | 111 +++++++++++++++++++++ .../test/functional/AmpleConditionalWriterIT.java | 77 ++++++++++++++ 8 files changed, 332 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index a23e6da08e..44f1715775 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.stream.Stream; @@ -209,7 +210,10 @@ public interface Ample { } /** - * An entry point for updating tablets metadata using a conditional writer. + * An entry point for updating tablets metadata using a conditional writer. The returned mutator + * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called. + * If buffering everything in memory is undesirable, then consider using + * {@link #conditionallyMutateTablets(BiConsumer)} * * @see ConditionalTabletMutator#submit(RejectionHandler) */ @@ -217,6 +221,26 @@ public interface Ample { throw new UnsupportedOperationException(); } + /** + * An entry point for updating tablets metadata using a conditional writer asynchronously. This + * will process conditional mutations in the background as they are added. The benefit of this + * method over {@link #conditionallyMutateTablets()} is that it can avoid buffering everything in + * memory. Using this method may also be faster as it allows tablet metadata scans and conditional + * updates of tablets to run concurrently. + * + * @param resultsConsumer as conditional mutations are processed in the background their result is + * passed to this consumer. This consumer should be thread safe as it may be called from a + * different thread. + * @return A conditional tablet mutator that will asynchronously report results. Closing this + * object will force everything to be processed and reported. The returned object is not + * thread safe and is only intended to be used by a single thread. + * @see ConditionalTabletMutator#submit(RejectionHandler) + */ + default AsyncConditionalTabletsMutator + conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) { + throw new UnsupportedOperationException(); + } + default void putGcCandidates(TableId tableId, Collection<StoredTabletFile> candidates) { throw new UnsupportedOperationException(); } @@ -264,7 +288,7 @@ public interface Ample { void close(); } - public interface ConditionalResult { + interface ConditionalResult { /** * This enum was created instead of using {@link ConditionalWriter.Status} because Ample has @@ -292,14 +316,22 @@ public interface Ample { TabletMetadata readMetadata(); } - public interface ConditionalTabletsMutator extends AutoCloseable { - + interface AsyncConditionalTabletsMutator extends AutoCloseable { /** * @return A fluent interface to conditional mutating a tablet. Ensure you call * {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished. */ OperationRequirements mutateTablet(KeyExtent extent); + /** + * Closing ensures that all mutations are processed and their results are reported. + */ + @Override + void close(); + } + + interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator { + /** * After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call * this method to process them using a {@link ConditionalWriter} @@ -307,9 +339,6 @@ public interface Ample { * @return The result from the {@link ConditionalWriter} of processing each tablet. */ Map<KeyExtent,ConditionalResult> process(); - - @Override - void close(); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java new file mode 100644 index 0000000000..0d31f71eef --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.ServerContext; + +public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator { + private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer; + private final ExecutorService executor; + private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null; + private ConditionalTabletsMutatorImpl bufferingMutator; + private final ServerContext context; + private long mutatedTablets = 0; + public static final int BATCH_SIZE = 1000; + + AsyncConditionalTabletsMutatorImpl(ServerContext context, + BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) { + this.resultsConsumer = Objects.requireNonNull(resultsConsumer); + this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); + this.context = context; + var creatorId = Thread.currentThread().getId(); + this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread( + "Async conditional tablets mutator background thread, created by : #" + creatorId, + runnable)); + + } + + @Override + public Ample.OperationRequirements mutateTablet(KeyExtent extent) { + if (mutatedTablets > BATCH_SIZE) { + if (backgroundProcessing != null) { + // a previous batch of mutations was submitted for processing so wait on it. + try { + backgroundProcessing.get().forEach(resultsConsumer); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + // Spin up processing of the mutations submitted so far in a background thread. Must copy the + // reference for the background thread because a new one is about to be created. + var bufferingMutatorRef = bufferingMutator; + backgroundProcessing = executor.submit(() -> { + var result = bufferingMutatorRef.process(); + bufferingMutatorRef.close(); + return result; + }); + + bufferingMutator = new ConditionalTabletsMutatorImpl(context); + mutatedTablets = 0; + } + mutatedTablets++; + return bufferingMutator.mutateTablet(extent); + } + + @Override + public void close() { + if (backgroundProcessing != null) { + // a previous batch of mutations was submitted for processing so wait on it. + try { + backgroundProcessing.get().forEach(resultsConsumer); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + // process anything not processed so far + bufferingMutator.process().forEach(resultsConsumer); + bufferingMutator.close(); + executor.shutdownNow(); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index f8d916e159..a42bc5f488 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -93,6 +94,12 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return new ConditionalTabletsMutatorImpl(context); } + @Override + public AsyncConditionalTabletsMutator + conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) { + return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer); + } + private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) { String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_GC_CANDIDATES; try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java index 0b0512b072..4262443dfc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java @@ -18,15 +18,10 @@ */ package org.apache.accumulo.manager.tableOps.delete; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; - import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Map.Entry; -import java.util.Set; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; @@ -40,11 +35,8 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; @@ -87,32 +79,11 @@ class CleanUp extends ManagerRepo { @Override public long isReady(long tid, Manager manager) throws Exception { + // ELASTICITY_TODO investigate this, what is it for and is it still needed? if (!manager.hasCycled(creationTime)) { return 50; } - boolean done = true; - - try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId) - .fetch(LOCATION, PREV_ROW, SUSPEND).checkConsistency().build()) { - Set<TServerInstance> liveTServers = manager.onlineTabletServers(); - for (TabletMetadata tm : tablets) { - TabletState state = TabletState.compute(tm, liveTServers); - if (!state.equals(TabletState.UNASSIGNED)) { - // This code will even wait on tablets that are assigned to dead tablets servers. This is - // intentional because the manager may make metadata writes for these tablets. See #587 - log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, " - + "Current tablet state: {}, locationState: {}", tableId, state, tm); - done = false; - break; - } - } - } - - if (!done) { - return 50; - } - return 0; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java index e8eb5c8761..b511056e2a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java @@ -49,7 +49,7 @@ public class DeleteTable extends ManagerRepo { public Repo<Manager> call(long tid, Manager env) { env.getTableManager().transitionTableState(tableId, TableState.DELETING); env.getEventCoordinator().event(tableId, "deleting table %s ", tableId); - return new CleanUp(tableId, namespaceId); + return new ReserveTablets(tableId, namespaceId); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java index d9c31c13b8..41ce26dcbb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java @@ -57,6 +57,7 @@ public class PreDeleteTable extends ManagerRepo { private void preventFutureCompactions(Manager environment) throws KeeperException, InterruptedException { + // ELASTICITY_TODO investigate this. Is still needed? Is it still working as expected? String deleteMarkerPath = createDeleteMarkerPath(environment.getInstanceID(), tableId); ZooReaderWriter zoo = environment.getContext().getZooReaderWriter(); zoo.putPersistentData(deleteMarkerPath, new byte[] {}, NodeExistsPolicy.SKIP); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java new file mode 100644 index 0000000000..afdd5fc155 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.delete; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReserveTablets extends ManagerRepo { + + private static final Logger log = LoggerFactory.getLogger(ReserveTablets.class); + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + private final NamespaceId namespaceId; + + public ReserveTablets(TableId tableId, NamespaceId namespaceId) { + this.tableId = tableId; + this.namespaceId = namespaceId; + } + + @Override + public long isReady(long tid, Manager manager) throws Exception { + + var opid = TabletOperationId.from(TabletOperationType.DELETING, tid); + + // The consumer may be called in another thread so use an AtomicLong + AtomicLong accepted = new AtomicLong(0); + BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + accepted.incrementAndGet(); + } else { + log.debug("Failed to set operation id {} {}", opid, extent); + } + }; + + long locations = 0; + long otherOps = 0; + long submitted = 0; + long tabletsSeen = 0; + + try ( + var tablets = manager.getContext().getAmple().readTablets().forTable(tableId) + .fetch(OPID, PREV_ROW, LOCATION).checkConsistency().build(); + var conditionalMutator = + manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) { + + for (var tabletMeta : tablets) { + tabletsSeen++; + if (tabletMeta.getLocation() != null) { + locations++; + } + + if (tabletMeta.getOperationId() != null) { + if (!opid.equals(tabletMeta.getOperationId())) { + otherOps++; + } + } else { + conditionalMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation() + .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId())); + submitted++; + } + } + } + + if (locations > 0 || otherOps > 0 || submitted != accepted.get()) { + log.debug("Waiting to delete table locations:{} operations:{} submitted:{} accepted:{}", + locations, otherOps, submitted, accepted.get()); + return Math.min(Math.max(100, tabletsSeen), 30000); + } + + return 0; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + return new CleanUp(tableId, namespaceId); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 2ee6c5d1e7..3a7e2d3c9d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -26,6 +26,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import static org.apache.accumulo.core.util.LazySingletons.GSON; @@ -43,8 +45,11 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -62,6 +67,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -73,6 +79,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -794,4 +801,74 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); } } + + @Test + public void testAsyncMutator() throws Exception { + var table = getUniqueNames(2)[1]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + // The AsyncConditionalTabletsMutatorImpl processes batches of conditional mutations. Run + // tests where more than the batch size is processed an ensure this handled correctly. + + TreeSet<Text> splits = + IntStream.range(1, (int) (AsyncConditionalTabletsMutatorImpl.BATCH_SIZE * 2.5)) + .mapToObj(i -> new Text(String.format("%06d", i))) + .collect(Collectors.toCollection(TreeSet::new)); + + assertTrue(splits.size() > AsyncConditionalTabletsMutatorImpl.BATCH_SIZE); + + c.tableOperations().create(table, new NewTableConfiguration().withSplits(splits)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(table)); + + var ample = cluster.getServerContext().getAmple(); + + AtomicLong accepted = new AtomicLong(0); + AtomicLong total = new AtomicLong(0); + BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> { + if (result.getStatus() == Status.ACCEPTED) { + accepted.incrementAndGet(); + } + total.incrementAndGet(); + }; + + // run a test where a subset of tablets are modified, all modifications should be accepted + var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50); + + int expected = 0; + try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); + var mutator = ample.conditionallyMutateTablets(resultsConsumer)) { + for (var tablet : tablets) { + if (tablet.getEndRow() != null + && Integer.parseInt(tablet.getEndRow().toString()) % 2 == 0) { + mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid1) + .submit(tm -> opid1.equals(tm.getOperationId())); + expected++; + } + } + } + + assertTrue(expected > 0); + assertEquals(expected, accepted.get()); + assertEquals(total.get(), accepted.get()); + + // run test where some will be accepted and some will be rejected and ensure the counts come + // out as expected. + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51); + + accepted.set(0); + total.set(0); + + try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); + var mutator = ample.conditionallyMutateTablets(resultsConsumer)) { + for (var tablet : tablets) { + mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid2) + .submit(tm -> opid2.equals(tm.getOperationId())); + } + } + + var numTablets = splits.size() + 1; + assertEquals(numTablets - expected, accepted.get()); + assertEquals(numTablets, total.get()); + } + } + }