This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 0edc48f910 Create a test Ample implementation (#4415)
0edc48f910 is described below

commit 0edc48f910f25e5273d62b1d91a7993bf9529b8e
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sat May 11 09:08:36 2024 -0400

    Create a test Ample implementation (#4415)
    
    This commit adds a new test Ample implementation that
    can be used to interact with Ample against a different table than the
    table that the Accumulo instance uses. This will allow testing Ample
    without having to deal with metadata being written to the table by the
    system.
    
    The test Ample also will support injecting custom behavior such as
    changing the return status of a conditional mutation in order to test.
    
    This closes #4376
---
 .../accumulo/core/metadata/schema/AmpleImpl.java   |  29 ++-
 .../core/metadata/schema/TabletsMetadata.java      |  15 +-
 pom.xml                                            |   7 +
 server/base/pom.xml                                |  18 ++
 .../AsyncConditionalTabletsMutatorImpl.java        |  11 +-
 .../metadata/ConditionalTabletsMutatorImpl.java    |  17 +-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  48 ++--
 .../server/metadata/TabletsMutatorImpl.java        |  14 +-
 .../metadata/ConditionalWriterDelegator.java       |  54 +++++
 .../metadata/ConditionalWriterInterceptor.java     |  86 +++++++
 .../apache/accumulo/server/metadata/TestAmple.java | 264 +++++++++++++++++++++
 test/pom.xml                                       |   6 +
 .../apache/accumulo/test/ample/TestAmpleIT.java    | 190 +++++++++++++++
 .../apache/accumulo/test/ample/TestAmpleUtil.java  |  38 +++
 .../apache/accumulo/test/fate/ManagerRepoIT.java   | 225 ++++++++++++++++++
 15 files changed, 992 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
index 89ce240cad..cdad8708e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
@@ -21,23 +21,34 @@ package org.apache.accumulo.core.metadata.schema;
 import static com.google.common.collect.MoreCollectors.onlyElement;
 
 import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Function;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata.Options;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions;
+
+import com.google.common.base.Preconditions;
 
 public class AmpleImpl implements Ample {
   private final AccumuloClient client;
+  private final Function<DataLevel,String> tableMapper;
 
   public AmpleImpl(AccumuloClient client) {
+    this(client, DataLevel::metaTable);
+  }
+
+  public AmpleImpl(AccumuloClient client, Function<DataLevel,String> 
tableMapper) {
     this.client = client;
+    this.tableMapper = Objects.requireNonNull(tableMapper);
   }
 
   @Override
   public TabletMetadata readTablet(KeyExtent extent, ReadConsistency 
readConsistency,
       ColumnType... colsToFetch) {
-    Options builder = TabletsMetadata.builder(client).forTablet(extent);
+    Options builder = newBuilder().forTablet(extent);
     if (colsToFetch.length > 0) {
       builder.fetch(colsToFetch);
     }
@@ -53,7 +64,21 @@ public class AmpleImpl implements Ample {
 
   @Override
   public TabletsMetadata.TableOptions readTablets() {
-    return TabletsMetadata.builder(this.client);
+    return newBuilder();
+  }
+
+  protected TableOptions newBuilder() {
+    return TabletsMetadata.builder(this.client, getTableMapper());
+  }
+
+  protected String getMetadataTableName(Ample.DataLevel dataLevel) {
+    final String metadataTable = getTableMapper().apply(dataLevel);
+    Preconditions.checkArgument(metadataTable != null,
+        "A metadata table for %s has not been registered", dataLevel);
+    return metadataTable;
   }
 
+  protected Function<DataLevel,String> getTableMapper() {
+    return tableMapper;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 48a1160513..c91e2d54de 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -117,9 +117,11 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE;
     private final AccumuloClient _client;
     private final List<TabletMetadataFilter> tabletMetadataFilters = new 
ArrayList<>();
+    private final Function<DataLevel,String> tableMapper;
 
-    Builder(AccumuloClient client) {
+    Builder(AccumuloClient client, Function<DataLevel,String> tableMapper) {
       this._client = client;
+      this.tableMapper = Objects.requireNonNull(tableMapper);
     }
 
     @Override
@@ -180,7 +182,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
         } else {
           try {
             BatchScanner scanner =
-                client.createBatchScanner(level.metaTable(), 
Authorizations.EMPTY);
+                client.createBatchScanner(tableMapper.apply(level), 
Authorizations.EMPTY);
 
             var ranges =
                 
groupedExtents.get(level).stream().map(KeyExtent::toMetaRange).collect(toList());
@@ -253,7 +255,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private TabletsMetadata buildNonRoot(AccumuloClient client) {
       try {
 
-        String resolvedTable = table == null ? level.metaTable() : table;
+        String resolvedTable = table == null ? tableMapper.apply(level) : 
table;
 
         Scanner scanner =
             new IsolatedScanner(client.createScanner(resolvedTable, 
Authorizations.EMPTY));
@@ -653,7 +655,12 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
   }
 
   public static TableOptions builder(AccumuloClient client) {
-    return new Builder(client);
+    return new Builder(client, DataLevel::metaTable);
+  }
+
+  public static TableOptions builder(AccumuloClient client,
+      Function<DataLevel,String> tableMapper) {
+    return new Builder(client, tableMapper);
   }
 
   private static TabletMetadata getRootMetadata(ClientContext ctx,
diff --git a/pom.xml b/pom.xml
index aeacdf925e..ea8923d947 100644
--- a/pom.xml
+++ b/pom.xml
@@ -624,6 +624,13 @@
         <artifactId>snakeyaml</artifactId>
         <version>2.2</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-server-base</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <dependencies>
diff --git a/server/base/pom.xml b/server/base/pom.xml
index c20b3e6edc..d7f23eb028 100644
--- a/server/base/pom.xml
+++ b/server/base/pom.xml
@@ -136,6 +136,24 @@
         <directory>src/test/resources</directory>
       </testResource>
     </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <configuration>
+              <includes>
+                <include>**/metadata/**</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
   </build>
   <profiles>
     <profile>
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
index d7666361f7..46532681d9 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
@@ -25,9 +25,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.ServerContext;
 
@@ -39,16 +41,18 @@ public class AsyncConditionalTabletsMutatorImpl implements 
Ample.AsyncConditiona
   private final ServerContext context;
   private long mutatedTablets = 0;
   public static final int BATCH_SIZE = 1000;
+  private final Function<DataLevel,String> tableMapper;
 
-  AsyncConditionalTabletsMutatorImpl(ServerContext context,
+  AsyncConditionalTabletsMutatorImpl(ServerContext context, 
Function<DataLevel,String> tableMapper,
       Consumer<Ample.ConditionalResult> resultsConsumer) {
     this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
-    this.bufferingMutator = new ConditionalTabletsMutatorImpl(context);
     this.context = context;
+    this.bufferingMutator = new ConditionalTabletsMutatorImpl(context, 
tableMapper);
     var creatorId = Thread.currentThread().getId();
     this.executor = Executors.newSingleThreadExecutor(runnable -> 
Threads.createThread(
         "Async conditional tablets mutator background thread, created by : #" 
+ creatorId,
         runnable));
+    this.tableMapper = Objects.requireNonNull(tableMapper);
 
   }
 
@@ -73,7 +77,7 @@ public class AsyncConditionalTabletsMutatorImpl implements 
Ample.AsyncConditiona
         return result;
       });
 
-      bufferingMutator = new ConditionalTabletsMutatorImpl(context);
+      bufferingMutator = new ConditionalTabletsMutatorImpl(context, 
tableMapper);
       mutatedTablets = 0;
     }
     mutatedTablets++;
@@ -95,4 +99,5 @@ public class AsyncConditionalTabletsMutatorImpl implements 
Ample.AsyncConditiona
     bufferingMutator.close();
     executor.shutdownNow();
   }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 2e999033fa..2ddf62fbc8 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -24,8 +24,10 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -35,6 +37,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.server.ServerContext;
@@ -61,9 +64,16 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
   private boolean active = true;
 
   Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>();
+  private final Function<DataLevel,String> tableMapper;
 
   public ConditionalTabletsMutatorImpl(ServerContext context) {
+    this(context, DataLevel::metaTable);
+  }
+
+  public ConditionalTabletsMutatorImpl(ServerContext context,
+      Function<DataLevel,String> tableMapper) {
     this.context = context;
+    this.tableMapper = Objects.requireNonNull(tableMapper);
   }
 
   @Override
@@ -90,7 +100,7 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
     if (dataLevel == Ample.DataLevel.ROOT) {
       return new RootConditionalWriter(context);
     } else {
-      return context.createConditionalWriter(dataLevel.metaTable());
+      return 
context.createConditionalWriter(getTableMapper().apply(dataLevel));
     }
   }
 
@@ -293,4 +303,9 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
 
   @Override
   public void close() {}
+
+  protected Function<DataLevel,String> getTableMapper() {
+    return tableMapper;
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 12419be290..abb297ad23 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -68,13 +69,18 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
   private static Logger log = LoggerFactory.getLogger(ServerAmpleImpl.class);
 
-  private ServerContext context;
+  private final ServerContext context;
 
   public ServerAmpleImpl(ServerContext context) {
     super(context);
     this.context = context;
   }
 
+  public ServerAmpleImpl(ServerContext context, Function<DataLevel,String> 
tableMapper) {
+    super(context, tableMapper);
+    this.context = context;
+  }
+
   @Override
   public Ample.TabletMutator mutateTablet(KeyExtent extent) {
     TabletsMutator tmi = mutateTablets();
@@ -85,18 +91,18 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
   @Override
   public TabletsMutator mutateTablets() {
-    return new TabletsMutatorImpl(context);
+    return new TabletsMutatorImpl(context, getTableMapper());
   }
 
   @Override
   public ConditionalTabletsMutator conditionallyMutateTablets() {
-    return new ConditionalTabletsMutatorImpl(context);
+    return new ConditionalTabletsMutatorImpl(context, getTableMapper());
   }
 
   @Override
   public AsyncConditionalTabletsMutator
       conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
-    return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer);
+    return new AsyncConditionalTabletsMutatorImpl(context, getTableMapper(), 
resultsConsumer);
   }
 
   private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
@@ -131,7 +137,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       return;
     }
 
-    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.of(tableId).metaTable())) {
+    try (BatchWriter writer = 
context.createBatchWriter(getMetaTable(DataLevel.of(tableId)))) {
       for (StoredTabletFile file : candidates) {
         writer.addMutation(createDeleteMutation(file));
       }
@@ -150,7 +156,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       return;
     }
 
-    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.of(tableId).metaTable())) {
+    try (BatchWriter writer = 
context.createBatchWriter(getMetaTable(DataLevel.of(tableId)))) {
       for (var fileOrDir : candidates) {
         writer.addMutation(createDeleteMutation(fileOrDir));
       }
@@ -204,7 +210,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       return;
     }
 
-    try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
+    try (BatchWriter writer = context.createBatchWriter(getMetaTable(level))) {
       if (type == GcCandidateType.VALID) {
         for (GcCandidate candidate : candidates) {
           Mutation m = new 
Mutation(DeletesSection.encodeRow(candidate.getPath()));
@@ -242,7 +248,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
       Scanner scanner;
       try {
-        scanner = context.createScanner(level.metaTable(), 
Authorizations.EMPTY);
+        scanner = context.createScanner(getMetaTable(level), 
Authorizations.EMPTY);
       } catch (TableNotFoundException e) {
         throw new IllegalStateException(e);
       }
@@ -274,7 +280,8 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
   @Override
   public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> 
scanRefs) {
-    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.USER.metaTable())) {
+    var metaTable = getMetaTable(DataLevel.USER);
+    try (BatchWriter writer = context.createBatchWriter(metaTable)) {
       String prefix = ScanServerFileReferenceSection.getRowPrefix();
       for (ScanServerRefTabletFile ref : scanRefs) {
         Mutation m = new Mutation(prefix + ref.getRowSuffix());
@@ -283,21 +290,22 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       }
     } catch (MutationsRejectedException | TableNotFoundException e) {
       throw new IllegalStateException(
-          "Error inserting scan server file references into " + 
DataLevel.USER.metaTable(), e);
+          "Error inserting scan server file references into " + metaTable, e);
     }
   }
 
   @Override
   public Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
+    var metaTable = getMetaTable(DataLevel.USER);
     try {
-      Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), 
Authorizations.EMPTY);
+      Scanner scanner = context.createScanner(metaTable, Authorizations.EMPTY);
       scanner.setRange(ScanServerFileReferenceSection.getRange());
       int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
       return scanner.stream().onClose(scanner::close)
           .map(e -> new 
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
               e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
     } catch (TableNotFoundException e) {
-      throw new IllegalStateException(DataLevel.USER.metaTable() + " not 
found!", e);
+      throw new IllegalStateException(metaTable + " not found!", e);
     }
   }
 
@@ -305,8 +313,8 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   public void deleteScanServerFileReferences(String serverAddress, UUID 
scanServerLockUUID) {
     Objects.requireNonNull(serverAddress, "Server address must be supplied");
     Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied");
-    try (
-        Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), 
Authorizations.EMPTY)) {
+    var metaTable = getMetaTable(DataLevel.USER);
+    try (Scanner scanner = context.createScanner(metaTable, 
Authorizations.EMPTY)) {
       scanner.setRange(ScanServerFileReferenceSection.getRange());
       scanner.fetchColumn(new Text(serverAddress), new 
Text(scanServerLockUUID.toString()));
 
@@ -320,13 +328,13 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
         this.deleteScanServerFileReferences(refsToDelete);
       }
     } catch (TableNotFoundException e) {
-      throw new IllegalStateException(DataLevel.USER.metaTable() + " not 
found!", e);
+      throw new IllegalStateException(metaTable + " not found!", e);
     }
   }
 
   @Override
   public void 
deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> 
refsToDelete) {
-    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.USER.metaTable())) {
+    try (BatchWriter writer = 
context.createBatchWriter(getMetaTable(DataLevel.USER))) {
       String prefix = ScanServerFileReferenceSection.getRowPrefix();
       for (ScanServerRefTabletFile ref : refsToDelete) {
         Mutation m = new Mutation(prefix + ref.getRowSuffix());
@@ -338,4 +346,12 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       throw new IllegalStateException(e);
     }
   }
+
+  ServerContext getContext() {
+    return context;
+  }
+
+  private String getMetaTable(DataLevel dataLevel) {
+    return getTableMapper().apply(dataLevel);
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java
index 2ce5b66d9c..72e4703ca1 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletsMutatorImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.accumulo.server.metadata;
 
+import java.util.Objects;
+import java.util.function.Function;
+
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -25,6 +28,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
 import org.apache.accumulo.server.ServerContext;
 
@@ -32,13 +36,15 @@ import com.google.common.base.Preconditions;
 
 public class TabletsMutatorImpl implements TabletsMutator {
 
-  private ServerContext context;
+  private final ServerContext context;
 
   private BatchWriter rootWriter;
   private BatchWriter metaWriter;
+  private final Function<DataLevel,String> tableMapper;
 
-  public TabletsMutatorImpl(ServerContext context) {
+  TabletsMutatorImpl(ServerContext context, Function<DataLevel,String> 
tableMapper) {
     this.context = context;
+    this.tableMapper = Objects.requireNonNull(tableMapper);
   }
 
   private BatchWriter getWriter(TableId tableId) {
@@ -48,13 +54,13 @@ public class TabletsMutatorImpl implements TabletsMutator {
     try {
       if (AccumuloTable.METADATA.tableId().equals(tableId)) {
         if (rootWriter == null) {
-          rootWriter = 
context.createBatchWriter(AccumuloTable.ROOT.tableName());
+          rootWriter = 
context.createBatchWriter(tableMapper.apply(DataLevel.METADATA));
         }
 
         return rootWriter;
       } else {
         if (metaWriter == null) {
-          metaWriter = 
context.createBatchWriter(AccumuloTable.METADATA.tableName());
+          metaWriter = 
context.createBatchWriter(tableMapper.apply(DataLevel.USER));
         }
 
         return metaWriter;
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java
 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java
new file mode 100644
index 0000000000..af4e3ad771
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterDelegator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+class ConditionalWriterDelegator implements ConditionalWriter {
+  private final ConditionalWriter delegate;
+  private final ConditionalWriterInterceptor interceptor;
+
+  public ConditionalWriterDelegator(ConditionalWriter delegate,
+      ConditionalWriterInterceptor interceptor) {
+    this.delegate = delegate;
+    this.interceptor = interceptor;
+  }
+
+  @Override
+  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+    mutations = interceptor.beforeWrite(mutations);
+    return interceptor.afterWrite(delegate.write(mutations));
+  }
+
+  @Override
+  public Result write(ConditionalMutation mutation) {
+    mutation = interceptor.beforeWrite(mutation);
+    return interceptor.afterWrite(delegate.write(mutation));
+  }
+
+  @Override
+  public void close() {
+    interceptor.beforeClose();
+    delegate.close();
+    interceptor.afterClose();
+  }
+}
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java
 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java
new file mode 100644
index 0000000000..36ce45b6e6
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalWriterInterceptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.ConditionalWriter.Result;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Streams;
+
+public interface ConditionalWriterInterceptor {
+
+  default Iterator<ConditionalMutation> 
beforeWrite(Iterator<ConditionalMutation> mutations) {
+    return mutations;
+  }
+
+  default Iterator<Result> afterWrite(Iterator<Result> results) {
+    return results;
+  }
+
+  default ConditionalMutation beforeWrite(ConditionalMutation mutation) {
+    return mutation;
+  }
+
+  default Result afterWrite(Result result) {
+    return result;
+  }
+
+  default void beforeClose() {
+
+  }
+
+  default void afterClose() {
+
+  }
+
+  static ConditionalWriterInterceptor withStatus(Status replaced, int times) {
+    return withStatus(null, replaced, times);
+  }
+
+  static ConditionalWriterInterceptor withStatus(Status firstExpected, Status 
replaced, int times) {
+    final AtomicInteger count = new AtomicInteger();
+    return new ConditionalWriterInterceptor() {
+      @Override
+      public Iterator<Result> afterWrite(Iterator<Result> results) {
+        if (count.getAndIncrement() < times) {
+          // For the first run only, make sure each state matches 
firstExpected if not null
+          // for other runs don't check since we are changing state so future 
runs may not match
+          return Streams.stream(results).map(r -> {
+            try {
+              Preconditions
+                  .checkState(times > 1 || firstExpected == null || 
r.getStatus() == firstExpected);
+            } catch (IllegalStateException e) {
+              throw e;
+            } catch (Exception e) {
+              throw new IllegalStateException(e);
+            }
+            return new Result(replaced, r.getMutation(), r.getTabletServer());
+          }).collect(Collectors.toList()).iterator();
+        }
+        return results;
+      }
+    };
+  }
+}
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java
new file mode 100644
index 0000000000..88b768f77a
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/metadata/TestAmple.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.TabletInformation;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.AccumuloTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletsMutator;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.MoreCollectors;
+
+public class TestAmple {
+
+  private static final ConditionalWriterInterceptor EMPTY_INTERCEPTOR =
+      new ConditionalWriterInterceptor() {};
+
+  private static final BiPredicate<Key,Value> INCLUDE_ALL_COLUMNS = (k, v) -> 
true;
+
+  public static BiPredicate<Key,Value> matches(ColumnFQ column) {
+    return (k, v) -> column.equals(new ColumnFQ(k));
+  }
+
+  public static BiPredicate<Key,Value> not(ColumnFQ column) {
+    return matches(column).negate();
+  }
+
+  public static Ample create(ServerContext context, Map<DataLevel,String> 
tables,
+      Supplier<ConditionalWriterInterceptor> cwInterceptor) {
+    return new TestServerAmpleImpl(context, tables, cwInterceptor);
+  }
+
+  public static Ample create(ServerContext context, Map<DataLevel,String> 
tables) {
+    return create(context, tables, () -> EMPTY_INTERCEPTOR);
+  }
+
+  public static class TestServerAmpleImpl extends ServerAmpleImpl {
+
+    private final Map<DataLevel,String> tables;
+    private final Supplier<ConditionalWriterInterceptor> cwInterceptor;
+    private final Supplier<ServerContext> testContext =
+        Suppliers.memoize(() -> testAmpleServerContext(super.getContext(), 
this));
+
+    private TestServerAmpleImpl(ServerContext context, Map<DataLevel,String> 
tables,
+        Supplier<ConditionalWriterInterceptor> cwInterceptor) {
+      super(context, tables::get);
+      this.tables = Map.copyOf(tables);
+      this.cwInterceptor = Objects.requireNonNull(cwInterceptor);
+      Preconditions.checkArgument(!tables.containsKey(DataLevel.ROOT));
+      Preconditions.checkArgument(
+          tables.containsKey(DataLevel.USER) || 
tables.containsKey(DataLevel.METADATA));
+    }
+
+    @Override
+    public TabletsMutator mutateTablets() {
+      return new TabletsMutatorImpl(getContext(), tables::get);
+    }
+
+    @Override
+    public ConditionalTabletsMutator conditionallyMutateTablets() {
+      return conditionallyMutateTablets(cwInterceptor.get());
+    }
+
+    @Override
+    public AsyncConditionalTabletsMutator
+        conditionallyMutateTablets(Consumer<ConditionalResult> 
resultsConsumer) {
+      return new AsyncConditionalTabletsMutatorImpl(getContext(), 
getTableMapper(),
+          resultsConsumer);
+    }
+
+    @Override
+    protected TableOptions newBuilder() {
+      return TabletsMetadata.builder(getContext(), getTableMapper());
+    }
+
+    /**
+     * Create default metadata for a Table
+     *
+     * TODO: Add a way to pass in options for config
+     *
+     * @param tableId The id of the table to create metadata for
+     */
+    public void createMetadata(TableId tableId) {
+      try (var tabletsMutator = mutateTablets()) {
+        var extent = new KeyExtent(tableId, null, null);
+        var tabletMutator = tabletsMutator.mutateTablet(extent);
+        String dirName = ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
+        tabletMutator.putPrevEndRow(extent.prevEndRow());
+        tabletMutator.putDirName(dirName);
+        tabletMutator.putTime(new MetadataTime(0, TimeType.MILLIS));
+        tabletMutator.putTabletAvailability(TabletAvailability.HOSTED);
+        tabletMutator.mutate();
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    /**
+     * Create metadata for a table by copying existing metadata for the table 
from the metadata
+     * table in an existing Accumulo instance
+     *
+     * TODO: Add config parents (such as a way to include/exclude what is 
copied, etc)
+     *
+     * @param client The client to scan the existing accumulo metadata table
+     * @param tableId The id of the table to create metadata for
+     * @throws Exception thrown for any error on metadata creation
+     */
+    public void createMetadataFromExisting(AccumuloClient client, TableId 
tableId)
+        throws Exception {
+      createMetadataFromExisting(client, tableId, INCLUDE_ALL_COLUMNS);
+    }
+
+    public void createMetadataFromExisting(AccumuloClient client, TableId 
tableId,
+        BiPredicate<Key,Value> includeColumn) throws Exception {
+      try (Scanner scanner =
+          client.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
+        scanner.setRange(TabletsSection.getRange(tableId));
+        IteratorSetting iterSetting = new IteratorSetting(100, 
WholeRowIterator.class);
+        scanner.addScanIterator(iterSetting);
+
+        try (BatchWriter bw =
+            
client.createBatchWriter(getMetadataTableName(DataLevel.of(tableId)))) {
+          for (Entry<Key,Value> entry : scanner) {
+            final SortedMap<Key,Value> decodedRow =
+                WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
+            Text row = decodedRow.firstKey().getRow();
+            Mutation m = new Mutation(row);
+
+            decodedRow.entrySet().stream().filter(e -> 
includeColumn.test(e.getKey(), e.getValue()))
+                .forEach(e -> m.put(e.getKey().getColumnFamily(), 
e.getKey().getColumnQualifier(),
+                    e.getKey().getColumnVisibilityParsed(), 
e.getKey().getTimestamp(),
+                    e.getValue()));
+            bw.addMutation(m);
+          }
+        }
+      }
+    }
+
+    private ConditionalTabletsMutator
+        conditionallyMutateTablets(ConditionalWriterInterceptor interceptor) {
+      Objects.requireNonNull(interceptor);
+
+      return new ConditionalTabletsMutatorImpl(getContext(), tables::get) {
+
+        @Override
+        protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel)
+            throws TableNotFoundException {
+          if (dataLevel == Ample.DataLevel.ROOT) {
+            return super.createConditionalWriter(dataLevel);
+          } else {
+            return new ConditionalWriterDelegator(
+                
getContext().createConditionalWriter(getTableMapper().apply(dataLevel)),
+                interceptor);
+          }
+        }
+      };
+    }
+
+    @Override
+    ServerContext getContext() {
+      return testContext.get();
+    }
+
+  }
+
+  public static ServerContext testAmpleServerContext(ServerContext context,
+      TestAmple.TestServerAmpleImpl ample) {
+    SiteConfiguration siteConfig;
+    try {
+      Map<String,String> propsMap = new HashMap<>();
+      context.getSiteConfiguration().getProperties(propsMap, x -> true);
+      siteConfig = SiteConfiguration.empty().withOverrides(propsMap).build();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return new ServerContext(siteConfig) {
+      @Override
+      public Ample getAmple() {
+        return ample;
+      }
+
+      @Override
+      public ServiceLock getServiceLock() {
+        return context.getServiceLock();
+      }
+
+      @Override
+      public void setServiceLock(ServiceLock lock) {
+        context.setServiceLock(lock);
+      }
+    };
+  }
+
+  public static void createMetadataTable(ClientContext client, String table) 
throws Exception {
+    final var metadataTableProps =
+        
client.tableOperations().getTableProperties(AccumuloTable.METADATA.tableName());
+
+    TabletAvailability availability;
+    try (var tabletStream = client.tableOperations()
+        .getTabletInformation(AccumuloTable.METADATA.tableName(), new 
Range())) {
+      availability = 
tabletStream.map(TabletInformation::getTabletAvailability).distinct()
+          .collect(MoreCollectors.onlyElement());
+    }
+
+    var newTableConf = new 
NewTableConfiguration().withInitialTabletAvailability(availability)
+        .withoutDefaultIterators().setProperties(metadataTableProps);
+    client.tableOperations().create(table, newTableConf);
+  }
+}
diff --git a/test/pom.xml b/test/pom.xml
index 709dbe2975..8e5249cc17 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -111,6 +111,12 @@
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-server-base</artifactId>
+      <type>test-jar</type>
+      <scope>compile</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-shell</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java 
b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java
new file mode 100644
index 0000000000..5f52ae1339
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.ample;
+
+import static 
org.apache.accumulo.core.client.ConditionalWriter.Status.ACCEPTED;
+import static org.apache.accumulo.core.client.ConditionalWriter.Status.UNKNOWN;
+import static 
org.apache.accumulo.server.metadata.ConditionalWriterInterceptor.withStatus;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.metadata.TestAmple;
+import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import com.google.common.base.Preconditions;
+
+public class TestAmpleIT extends SharedMiniClusterBase {
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testCreateMetadataFromExisting() throws Exception {
+
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0];
+    String userTable = tableNames[1];
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(userTable);
+
+      TestAmple.createMetadataTable(client, metadataTable);
+
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      TestServerAmpleImpl ample = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+
+      ample.createMetadataFromExisting(client, tableId);
+
+      var count = new AtomicInteger();
+      try (var tablets = 
ample.readTablets().forTable(tableId).build().stream()) {
+        tablets.forEach(tm -> {
+          assertNotNull(tm.getTableId());
+          assertNotNull(tm.getExtent());
+          assertNotNull(tm.getTabletAvailability());
+          assertNotNull(tm.getTime());
+          count.incrementAndGet();
+        });
+      }
+      assertEquals(1, count.get());
+    }
+  }
+
+  @Test
+  public void testCreateMetadata() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0];
+    String userTable = tableNames[1];
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(userTable);
+      TestAmple.createMetadataTable(client, metadataTable);
+
+      TableId tableId = TableId.of("1");
+      TestServerAmpleImpl ample = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+
+      ample.createMetadata(tableId);
+
+      var count = new AtomicInteger();
+      try (var tablets = 
ample.readTablets().forTable(tableId).build().stream()) {
+        tablets.forEach(tm -> {
+          assertNotNull(tm.getTableId());
+          assertNotNull(tm.getExtent());
+          assertNotNull(tm.getTabletAvailability());
+          assertNotNull(tm.getTime());
+          count.incrementAndGet();
+        });
+      }
+      assertEquals(1, count.get());
+    }
+
+  }
+
+  // This is an example test showing how to test a conditional
+  // mutation rejection handler by using a ConditionalWriterInterceptor
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testUnknownStatus(boolean accepted) throws Exception {
+
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0] + accepted;
+    String userTable = tableNames[1] + accepted;
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(userTable);
+
+      TestAmple.createMetadataTable(client, metadataTable);
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      TestServerAmpleImpl ample =
+          (TestServerAmpleImpl) 
TestAmple.create(getCluster().getServerContext(),
+              Map.of(DataLevel.USER, metadataTable), () -> 
withStatus(ACCEPTED, UNKNOWN, 1));
+      ample.createMetadataFromExisting(client, tableId);
+
+      // Add a custom interceptor that will replace the result status with 
UNKNOWN
+      // for only the first time the method is called instead of the actual 
state
+      // (which should be accepted)
+      //
+      // When the result of UNKNOWN is returned, the mutator will trigger a 
retry
+      // and resubmit the mutation. On retry, the mutation should be rejected 
because
+      // the mutation requires an absent operation that will have already been 
set on
+      // the previous submission.
+      //
+      // This will cause the mutator to check the rejection handler to see if 
we should actually
+      // accept the mutation. This test uses a boolean to run twice to test 
both the
+      // case of when the rejection handle will return true and false so that 
we can test that
+      // the state is correctly set to either ACCEPTED or REJECTED after the 
rejection handler is
+      // executed.
+      try (var tabletsMutator = ample.conditionallyMutateTablets()) {
+        var mutator = tabletsMutator.mutateTablet(new KeyExtent(tableId, null, 
null))
+            .requireAbsentOperation();
+        var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+        var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 
fateId);
+
+        mutator.putOperation(opid);
+
+        // If accepted is true then we want to test the ACCEPTED case so 
return true if
+        // the opid matches. If false we want this to fail to test REJECTED, 
so we return
+        // false no matter what
+        mutator.submit(afterMeta -> accepted && 
opid.equals(afterMeta.getOperationId()));
+
+        var results = tabletsMutator.process();
+        results.values().forEach(result -> {
+          var status = result.getStatus();
+          // check the state is correct
+          Preconditions.checkState(accepted ? status == Status.ACCEPTED : 
status == Status.REJECTED,
+              "Failed %s, %s", status, result.getExtent());
+        });
+      }
+    }
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java 
b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
new file mode 100644
index 0000000000..07c9dcae3b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.ample;
+
+import static 
org.apache.accumulo.server.metadata.TestAmple.testAmpleServerContext;
+
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl;
+import org.easymock.EasyMock;
+
+public class TestAmpleUtil {
+
+  public static Manager mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample) {
+    Manager manager = EasyMock.mock(Manager.class);
+    
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, 
ample))
+        .atLeastOnce();
+    EasyMock.replay(manager);
+    return manager;
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
new file mode 100644
index 0000000000..82ae56d75f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static org.apache.accumulo.server.metadata.TestAmple.not;
+import static org.apache.accumulo.test.ample.TestAmpleUtil.mockWithAmple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
+import org.apache.accumulo.manager.tableOps.merge.MergeTablets;
+import org.apache.accumulo.manager.tableOps.merge.ReserveTablets;
+import org.apache.accumulo.manager.tableOps.split.FindSplits;
+import org.apache.accumulo.manager.tableOps.split.PreSplit;
+import org.apache.accumulo.server.metadata.TestAmple;
+import org.apache.accumulo.server.metadata.TestAmple.TestServerAmpleImpl;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class ManagerRepoIT extends SharedMiniClusterBase {
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @ParameterizedTest
+  @EnumSource(MergeInfo.Operation.class)
+  public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws 
Exception {
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0] + operation;
+    String userTable = tableNames[1] + operation;
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(userTable);
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      // Set up Test ample and manager
+      TestAmple.createMetadataTable(client, metadataTable);
+      TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+      testAmple.createMetadataFromExisting(client, tableId);
+      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+
+      // Create a test operation and fate id for testing merge and delete rows
+      // and add operation to test metadata for the tablet
+      var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+      var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId);
+      var extent = new KeyExtent(tableId, null, null);
+
+      try (TabletsMutator tm = testAmple.mutateTablets()) {
+        tm.mutateTablet(extent).putOperation(opid).mutate();
+      }
+
+      // Build either MergeTablets or DeleteRows repo for testing no WALs, 
both should check this
+      // condition
+      final MergeInfo mergeInfo = new MergeInfo(tableId,
+          manager.getContext().getNamespaceId(tableId), null, null, operation);
+      final ManagerRepo repo =
+          operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new 
DeleteRows(mergeInfo);
+      // Also test ReserveTablets isReady()
+      final ManagerRepo reserve = new ReserveTablets(mergeInfo);
+
+      // First, check no errors with the default case
+      assertEquals(0, reserve.isReady(fateId, manager));
+      assertNotNull(repo.call(fateId, manager));
+
+      // Write a WAL to the test metadata and then re-run the repo to check 
for an error
+      try (TabletsMutator tm = testAmple.mutateTablets()) {
+        var walFilePath = Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
+        
tm.mutateTablet(extent).putWal(LogEntry.fromPath(walFilePath)).mutate();
+      }
+
+      // Should not be ready due to the presence of a WAL
+      assertTrue(reserve.isReady(fateId, manager) > 0);
+
+      // Repo should throw an exception due to the WAL existence
+      var thrown = assertThrows(IllegalStateException.class, () -> 
repo.call(fateId, manager));
+      assertTrue(thrown.getMessage().contains("has unexpected walogs"));
+    }
+  }
+
+  @Test
+  public void testFindSplitsUnsplittable() throws Exception {
+
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0];
+    String userTable = tableNames[1];
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      TestAmple.createMetadataTable(client, metadataTable);
+
+      // Create table with a smaller max end row size
+      createUnsplittableTable(client, userTable);
+      populateUnsplittableTable(client, userTable);
+
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+      // Prevent UNSPLITTABLE_COLUMN just in case a system split tried to run 
on the table
+      // before we copied it and inserted the column
+      testAmple.createMetadataFromExisting(client, tableId,
+          not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
+
+      KeyExtent extent = new KeyExtent(tableId, null, null);
+      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+
+      FindSplits findSplits = new FindSplits(extent);
+      PreSplit preSplit = (PreSplit) findSplits
+          .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), 
manager);
+
+      // The table should not need splitting
+      assertNull(preSplit);
+
+      // Verify metadata has unsplittable column
+      var metadata = testAmple.readTablet(new KeyExtent(tableId, null, 
null)).getUnSplittable();
+      assertNotNull(metadata);
+
+      // Bump max end row size and verify split occurs and unsplittable column 
is cleaned up
+      client.tableOperations().setProperty(userTable, 
Property.TABLE_MAX_END_ROW_SIZE.getKey(),
+          "500");
+
+      findSplits = new FindSplits(extent);
+      preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, 
UUID.randomUUID()),
+          manager);
+
+      // The table SHOULD now need splitting
+      assertNotNull(preSplit);
+
+      // Verify unsplittable metadata is still the same and exists
+      // This will not be cleared until the UpdateTablets repo runs
+      // so if the test is updated to test UpdateTablets this can be checked
+      assertEquals(metadata,
+          testAmple.readTablet(new KeyExtent(tableId, null, 
null)).getUnSplittable());
+    }
+  }
+
+  private void createUnsplittableTable(ClientContext client, String table) 
throws Exception {
+    // make a table and lower the configuration properties
+    // @formatter:off
+    Map<String,String> props = Map.of(
+        Property.TABLE_SPLIT_THRESHOLD.getKey(), "1K",
+        Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none",
+        Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64",
+        Property.TABLE_MAX_END_ROW_SIZE.getKey(), "" + 100,
+        Property.TABLE_MAJC_RATIO.getKey(), "9999"
+    );
+    // @formatter:on
+    client.tableOperations().create(table, new 
NewTableConfiguration().setProperties(props));
+
+  }
+
+  private void populateUnsplittableTable(ClientContext client, String table) 
throws Exception {
+    byte[] data = new byte[101];
+    Arrays.fill(data, 0, data.length - 2, (byte) 'm');
+
+    final int numOfMutations = 20;
+    try (BatchWriter batchWriter = client.createBatchWriter(table)) {
+      // Make the last place in the key different for every entry added to the 
table
+      for (int i = 0; i < numOfMutations; i++) {
+        data[data.length - 1] = (byte) i;
+        Mutation m = new Mutation(data);
+        m.put("cf", "cq", "value");
+        batchWriter.addMutation(m);
+      }
+    }
+    client.tableOperations().flush(table, null, null, true);
+  }
+}

Reply via email to